Skip to content

Commit

Permalink
feat(metric): numeric metric minimum calculation (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
niyasrad authored Aug 12, 2023
1 parent 887e6c2 commit 6cd9646
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 1 deletion.
24 changes: 24 additions & 0 deletions datachecks/core/datasource/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ def query_get_document_count(self, index_name: str, filters: Dict = None) -> int
:return: count of documents
"""
raise NotImplementedError("query_get_document_count method is not implemented")

def query_get_min(self, index_name: str, field: str, filters: str = None) -> int:
"""
Get the min value
:param index_name: name of the index
:param field: field name
:param filters: optional filter
:return: min value
"""
raise NotImplementedError("query_get_min method is not implemented")

def query_get_max(self, index_name: str, field: str, filters: str = None) -> int:
"""
Expand Down Expand Up @@ -131,6 +141,20 @@ def query_get_row_count(self, table: str, filters: str = None) -> int:

return self.connection.execute(text(query)).fetchone()[0]

def query_get_min(self, table: str, field: str, filters: str = None) -> int:
"""
Get the min value
:param table: table name
:param field: column name
:param filters: filter condition
:return:
"""
query = "SELECT MIN({}) FROM {}".format(field, table)
if filters:
query += " WHERE {}".format(filters)

return self.connection.execute(text(query)).fetchone()[0]

def query_get_max(self, table: str, field: str, filters: str = None) -> int:
"""
Get the max value
Expand Down
15 changes: 15 additions & 0 deletions datachecks/core/datasource/opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,21 @@ def query_get_document_count(self, index_name: str, filters: Dict = None) -> int
body = {"query": filters} if filters else {}
response = self.client.count(index=index_name, body=body)
return response["count"]

def query_get_min(self, index_name: str, field: str, filters: Dict = None) -> int:
"""
Get the min value of a field
:param index_name:
:param field:
:param filters:
:return:
"""
query = {"aggs": {"min_value": {"min": {"field": field}}}}
if filters:
query["query"] = filters

response = self.client.search(index=index_name, body=query)
return response["aggregations"]["min_value"]["value"]

def query_get_max(self, index_name: str, field: str, filters: Dict = None) -> int:
"""
Expand Down
1 change: 1 addition & 0 deletions datachecks/core/metric/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
class MetricsType(str, Enum):
ROW_COUNT = "row_count"
DOCUMENT_COUNT = "document_count"
MIN = "min"
MAX = "max"
AVG = "avg"
FRESHNESS = "freshness"
Expand Down
33 changes: 33 additions & 0 deletions datachecks/core/metric/numeric_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,39 @@ def _generate_metric_value(self):
else:
raise ValueError("Invalid data source type")

class MinMetric(FieldMetrics):

"""
MinMetric is a class that represents a metric test is generated by a data source.
"""

def get_metric_identity(self):
return MetricIdentity.generate_identity(
metric_type=MetricsType.MIN,
metric_name=self.name,
data_source=self.data_source,
field_name=self.field_name,
table_name=self.table_name if self.table_name else None,
index_name=self.index_name if self.index_name else None,
)

def _generate_metric_value(self):
if isinstance(self.data_source, SQLDatasource):
return self.data_source.query_get_min(
table=self.table_name,
field=self.field_name,
filters=self.filter_query if self.filter_query else None,
)
elif isinstance(self.data_source, SearchIndexDataSource):
return self.data_source.query_get_min(
index_name=self.index_name,
field=self.field_name,
filters=self.filter_query if self.filter_query else None,
)
else:
raise ValueError("Invalid data source type")



class MaxMetric(FieldMetrics):

Expand Down
58 changes: 57 additions & 1 deletion tests/core/metric/test_numeric_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
OpenSearchSearchIndexDataSource
from datachecks.core.datasource.postgres import PostgresSQLDatasource
from datachecks.core.metric.base import MetricsType
from datachecks.core.metric.numeric_metric import (DocumentCountMetric,
from datachecks.core.metric.numeric_metric import (DocumentCountMetric, MinMetric,
MaxMetric, AvgMetric, RowCountMetric)
from tests.utils import create_opensearch_client, create_postgres_connection

Expand Down Expand Up @@ -150,6 +150,62 @@ def test_should_return_row_count_metric_with_filter(
row_value = row.get_value()
assert row_value["value"] == 3

@pytest.mark.usefixtures("setup_data", "postgres_datasource", "opensearch_datasource")
class TestMinColumnValueMetric:
def test_should_return_min_column_value_postgres_without_filter(
self, postgres_datasource: PostgresSQLDatasource
):
row = MinMetric(
name="min_metric_test",
data_source=postgres_datasource,
table_name="numeric_metric_test",
metric_type=MetricsType.MIN,
field_name="age",
)
row_value = row.get_value()
assert row_value["value"] == 30

def test_should_return_min_column_value_postgres_with_filter(
self, postgres_datasource: PostgresSQLDatasource
):
row = MinMetric(
name="min_metric_test_1",
data_source=postgres_datasource,
table_name="numeric_metric_test",
metric_type=MetricsType.MIN,
field_name="age",
filters={"where_clause": "age >= 100 AND age <= 200"},
)
row_value = row.get_value()
assert row_value["value"] == 110

def test_should_return_min_column_value_opensearch_without_filter(
self, opensearch_datasource: OpenSearchSearchIndexDataSource
):
row = MinMetric(
name="min_metric_test",
data_source=opensearch_datasource,
index_name="numeric_metric_test",
metric_type=MetricsType.MIN,
field_name="age",
)
row_value = row.get_value()
assert row_value["value"] == 30

def test_should_return_min_column_value_opensearch_with_filter(
self, opensearch_datasource: OpenSearchSearchIndexDataSource
):
row = MinMetric(
name="min_metric_test_1",
data_source=opensearch_datasource,
index_name="numeric_metric_test",
metric_type=MetricsType.MIN,
field_name="age",
filters={"search_query": '{"range": {"age": {"gte": 100, "lte": 200}}}'},
)
row_value = row.get_value()
assert row_value["value"] == 110


@pytest.mark.usefixtures("setup_data", "postgres_datasource", "opensearch_datasource")
class TestMaxColumnValueMetric:
Expand Down

0 comments on commit 6cd9646

Please sign in to comment.