diff --git a/datachecks/core/datasource/base.py b/datachecks/core/datasource/base.py index 68506551..cfd3568f 100644 --- a/datachecks/core/datasource/base.py +++ b/datachecks/core/datasource/base.py @@ -67,6 +67,16 @@ def query_get_document_count(self, index_name: str, filters: Dict = None) -> int :return: count of documents """ raise NotImplementedError("query_get_document_count method is not implemented") + + def query_get_min(self, index_name: str, field: str, filters: str = None) -> int: + """ + Get the min value + :param index_name: name of the index + :param field: field name + :param filters: optional filter + :return: min value + """ + raise NotImplementedError("query_get_min method is not implemented") def query_get_max(self, index_name: str, field: str, filters: str = None) -> int: """ @@ -131,6 +141,20 @@ def query_get_row_count(self, table: str, filters: str = None) -> int: return self.connection.execute(text(query)).fetchone()[0] + def query_get_min(self, table: str, field: str, filters: str = None) -> int: + """ + Get the min value + :param table: table name + :param field: column name + :param filters: filter condition + :return: + """ + query = "SELECT MIN({}) FROM {}".format(field, table) + if filters: + query += " WHERE {}".format(filters) + + return self.connection.execute(text(query)).fetchone()[0] + def query_get_max(self, table: str, field: str, filters: str = None) -> int: """ Get the max value diff --git a/datachecks/core/datasource/opensearch.py b/datachecks/core/datasource/opensearch.py index 7acf4f23..dcc0ad74 100644 --- a/datachecks/core/datasource/opensearch.py +++ b/datachecks/core/datasource/opensearch.py @@ -69,6 +69,21 @@ def query_get_document_count(self, index_name: str, filters: Dict = None) -> int body = {"query": filters} if filters else {} response = self.client.count(index=index_name, body=body) return response["count"] + + def query_get_min(self, index_name: str, field: str, filters: Dict = None) -> int: + """ + Get the min value of a field + :param index_name: + :param field: + :param filters: + :return: + """ + query = {"aggs": {"min_value": {"min": {"field": field}}}} + if filters: + query["query"] = filters + + response = self.client.search(index=index_name, body=query) + return response["aggregations"]["min_value"]["value"] def query_get_max(self, index_name: str, field: str, filters: Dict = None) -> int: """ diff --git a/datachecks/core/metric/base.py b/datachecks/core/metric/base.py index 3977cad5..9f68c147 100644 --- a/datachecks/core/metric/base.py +++ b/datachecks/core/metric/base.py @@ -26,6 +26,7 @@ class MetricsType(str, Enum): ROW_COUNT = "row_count" DOCUMENT_COUNT = "document_count" + MIN = "min" MAX = "max" AVG = "avg" FRESHNESS = "freshness" diff --git a/datachecks/core/metric/numeric_metric.py b/datachecks/core/metric/numeric_metric.py index ab4351a3..7a914fb5 100644 --- a/datachecks/core/metric/numeric_metric.py +++ b/datachecks/core/metric/numeric_metric.py @@ -72,6 +72,39 @@ def _generate_metric_value(self): else: raise ValueError("Invalid data source type") +class MinMetric(FieldMetrics): + + """ + MinMetric is a class that represents a metric test is generated by a data source. + """ + + def get_metric_identity(self): + return MetricIdentity.generate_identity( + metric_type=MetricsType.MIN, + metric_name=self.name, + data_source=self.data_source, + field_name=self.field_name, + table_name=self.table_name if self.table_name else None, + index_name=self.index_name if self.index_name else None, + ) + + def _generate_metric_value(self): + if isinstance(self.data_source, SQLDatasource): + return self.data_source.query_get_min( + table=self.table_name, + field=self.field_name, + filters=self.filter_query if self.filter_query else None, + ) + elif isinstance(self.data_source, SearchIndexDataSource): + return self.data_source.query_get_min( + index_name=self.index_name, + field=self.field_name, + filters=self.filter_query if self.filter_query else None, + ) + else: + raise ValueError("Invalid data source type") + + class MaxMetric(FieldMetrics): diff --git a/tests/core/metric/test_numeric_metric.py b/tests/core/metric/test_numeric_metric.py index 1e692b9b..9d1683b3 100644 --- a/tests/core/metric/test_numeric_metric.py +++ b/tests/core/metric/test_numeric_metric.py @@ -22,7 +22,7 @@ OpenSearchSearchIndexDataSource from datachecks.core.datasource.postgres import PostgresSQLDatasource from datachecks.core.metric.base import MetricsType -from datachecks.core.metric.numeric_metric import (DocumentCountMetric, +from datachecks.core.metric.numeric_metric import (DocumentCountMetric, MinMetric, MaxMetric, AvgMetric, RowCountMetric) from tests.utils import create_opensearch_client, create_postgres_connection @@ -150,6 +150,62 @@ def test_should_return_row_count_metric_with_filter( row_value = row.get_value() assert row_value["value"] == 3 +@pytest.mark.usefixtures("setup_data", "postgres_datasource", "opensearch_datasource") +class TestMinColumnValueMetric: + def test_should_return_min_column_value_postgres_without_filter( + self, postgres_datasource: PostgresSQLDatasource + ): + row = MinMetric( + name="min_metric_test", + data_source=postgres_datasource, + table_name="numeric_metric_test", + metric_type=MetricsType.MIN, + field_name="age", + ) + row_value = row.get_value() + assert row_value["value"] == 30 + + def test_should_return_min_column_value_postgres_with_filter( + self, postgres_datasource: PostgresSQLDatasource + ): + row = MinMetric( + name="min_metric_test_1", + data_source=postgres_datasource, + table_name="numeric_metric_test", + metric_type=MetricsType.MIN, + field_name="age", + filters={"where_clause": "age >= 100 AND age <= 200"}, + ) + row_value = row.get_value() + assert row_value["value"] == 110 + + def test_should_return_min_column_value_opensearch_without_filter( + self, opensearch_datasource: OpenSearchSearchIndexDataSource + ): + row = MinMetric( + name="min_metric_test", + data_source=opensearch_datasource, + index_name="numeric_metric_test", + metric_type=MetricsType.MIN, + field_name="age", + ) + row_value = row.get_value() + assert row_value["value"] == 30 + + def test_should_return_min_column_value_opensearch_with_filter( + self, opensearch_datasource: OpenSearchSearchIndexDataSource + ): + row = MinMetric( + name="min_metric_test_1", + data_source=opensearch_datasource, + index_name="numeric_metric_test", + metric_type=MetricsType.MIN, + field_name="age", + filters={"search_query": '{"range": {"age": {"gte": 100, "lte": 200}}}'}, + ) + row_value = row.get_value() + assert row_value["value"] == 110 + @pytest.mark.usefixtures("setup_data", "postgres_datasource", "opensearch_datasource") class TestMaxColumnValueMetric: