Skip to content

Commit

Permalink
Merge pull request #14 from great-expectations/f/core-765/use_validat…
Browse files Browse the repository at this point in the history
…ion_definition

Use ValidationDefinition.run
  • Loading branch information
joshua-stauffer authored Jan 10, 2025
2 parents fc333fb + b6e24cf commit 081b29c
Show file tree
Hide file tree
Showing 14 changed files with 352 additions and 62 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ GX validates it against the provided Expectations. It has two required parameter
- `expect` is either a single Expectation or an ExpectationSuite

Optionally, you can also pass a `result_format` parameter to control the verbosity of the output.
The GXValidateDataFrameOperator will return a serialized ExpectationValidationResult, or ExpectationSuiteValidationResult.
The GXValidateDataFrameOperator will return a serialized ExpectationSuiteValidationResult.

## GXValidateBatchOperator
This Operator is similar to the GXValidateDataFrameOperator, except that GX is responsible
Expand All @@ -20,7 +20,7 @@ Its required parameters are:

Optionally, you can also pass a `result_format` parameter to control the verbosity of the output, and
`batch_parameters` to specify a batch of data at runtime.
The GXValidateBatchOperator will return a serialized ExpectationValidationResult, or ExpectationSuiteValidationResult.
The GXValidateBatchOperator will return a serialized ExpectationSuiteValidationResult.

## GXValidateCheckpointOperator
This Operator can take advantage of all the features of GX. The user configures a `Checkpoint`,
Expand Down
Empty file.
47 changes: 47 additions & 0 deletions great_expectations_provider/common/gx_context_actions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Literal

if TYPE_CHECKING:
from great_expectations import ExpectationSuite
from great_expectations.core.batch_definition import BatchDefinition
from great_expectations.core.expectation_validation_result import (
ExpectationSuiteValidationResult,
)
from great_expectations.data_context import AbstractDataContext
from great_expectations.expectations import Expectation


def run_validation_definition(
task_id: str,
expect: Expectation | ExpectationSuite,
batch_definition: BatchDefinition,
result_format: Literal["BOOLEAN_ONLY", "BASIC", "SUMMARY", "COMPLETE"] | None,
batch_parameters: dict,
gx_context: AbstractDataContext,
) -> ExpectationSuiteValidationResult:
"""Given a BatchDefinition and an Expectation or ExpectationSuite, ensure a
ValidationDefinition and run it."""
import great_expectations as gx

if isinstance(expect, gx.expectations.Expectation):
suite = gx.ExpectationSuite(name=task_id, expectations=[expect])
else:
suite = expect
validation_definition = gx_context.validation_definitions.add_or_update(
validation=gx.ValidationDefinition(
name=task_id,
suite=suite,
data=batch_definition,
),
)
if result_format:
result = validation_definition.run(
batch_parameters=batch_parameters,
result_format=result_format,
)
else:
result = validation_definition.run(
batch_parameters=batch_parameters,
)
return result
19 changes: 12 additions & 7 deletions great_expectations_provider/operators/validate_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

from airflow.models import BaseOperator

from great_expectations_provider.common.gx_context_actions import (
run_validation_definition,
)

if TYPE_CHECKING:
from airflow.utils.context import Context
from great_expectations import ExpectationSuite
Expand Down Expand Up @@ -41,11 +45,12 @@ def execute(self, context: Context) -> dict:

gx_context = gx.get_context(mode=self.context_type)
batch_definition = self.configure_batch_definition(gx_context)
batch = batch_definition.get_batch(batch_parameters=self.batch_parameters)
if self.result_format:
result = batch.validate(
expect=self.expect, result_format=self.result_format
)
else:
result = batch.validate(expect=self.expect)
result = run_validation_definition(
task_id=self.task_id,
expect=self.expect,
batch_definition=batch_definition,
result_format=self.result_format,
batch_parameters=self.batch_parameters,
gx_context=gx_context,
)
return result.describe_dict()
48 changes: 34 additions & 14 deletions great_expectations_provider/operators/validate_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@

from airflow.models import BaseOperator

from great_expectations_provider.common.gx_context_actions import (
run_validation_definition,
)

if TYPE_CHECKING:
from airflow.utils.context import Context
from great_expectations import ExpectationSuite
from great_expectations.core.batch_definition import BatchDefinition
from great_expectations.data_context import AbstractDataContext
from great_expectations.expectations import Expectation
from pandas import DataFrame

Expand All @@ -33,20 +39,34 @@ def execute(self, context: Context) -> dict:
import great_expectations as gx

gx_context = gx.get_context(mode=self.context_type)
batch = (
gx_context.data_sources.add_pandas(name=self.task_id)
batch_definition = self._get_pandas_batch_definition(gx_context)
batch_parameters = {
"dataframe": self.dataframe,
}
result = run_validation_definition(
task_id=self.task_id,
expect=self.expect,
batch_definition=batch_definition,
result_format=self.result_format,
batch_parameters=batch_parameters,
gx_context=gx_context,
)
return result.describe_dict()

def _get_spark_batch_definition(
self, gx_context: AbstractDataContext
) -> BatchDefinition:
return (
gx_context.data_sources.add_or_update_spark(name=self.task_id)
.add_dataframe_asset(name=self.task_id)
.add_batch_definition_whole_dataframe(name=self.task_id)
)

def _get_pandas_batch_definition(
self, gx_context: AbstractDataContext
) -> BatchDefinition:
return (
gx_context.data_sources.add_or_update_pandas(name=self.task_id)
.add_dataframe_asset(name=self.task_id)
.add_batch_definition_whole_dataframe(name=self.task_id)
.get_batch(
batch_parameters={
"dataframe": self.dataframe,
}
)
)
if self.result_format:
result = batch.validate(
expect=self.expect, result_format=self.result_format
)
else:
result = batch.validate(expect=self.expect)
return result.describe_dict()
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ packages = find_namespace:
include_package_data = true
install_requires =
apache-airflow>=2.1
great-expectations[snowflake,postgresql,mssql,bigquery,athena,spark,gcp,azure,s3]>=1.0.0
great-expectations[snowflake,postgresql,mssql,bigquery,athena,spark,gcp,azure,s3]>=1.3.1

[options.extras_require]
tests =
Expand Down
4 changes: 4 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ def rand_name() -> str:
return "".join(random.choices(string.ascii_lowercase, k=10))


def is_valid_gx_cloud_url(url: str) -> bool:
return url.startswith("https://app.greatexpectations.io/organizations/")


@pytest.fixture
def table_name() -> str:
return "test_table"
Expand Down
9 changes: 8 additions & 1 deletion tests/integration/test_validate_batch_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@
class TestValidateBatchOperator:
COL_NAME = "my_column"

def test_with_cloud_context(self, ensure_data_source_cleanup) -> None:
def test_with_cloud_context(
self,
ensure_data_source_cleanup: Callable[[str], None],
ensure_suite_cleanup: Callable[[str], None],
ensure_validation_definition_cleanup: Callable[[str], None],
) -> None:
task_id = f"validate_batch_cloud_integration_test_{rand_name()}"
ensure_data_source_cleanup(task_id)
ensure_suite_cleanup(task_id)
ensure_validation_definition_cleanup(task_id)
dataframe = pd.DataFrame({self.COL_NAME: ["a", "b", "c"]})
expect = gxe.ExpectColumnValuesToBeInSet(
column=self.COL_NAME,
Expand Down
7 changes: 2 additions & 5 deletions tests/integration/test_validate_checkpoint_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from great_expectations_provider.operators.validate_checkpoint import (
GXValidateCheckpointOperator,
)
from integration.conftest import rand_name
from integration.conftest import is_valid_gx_cloud_url, rand_name


class TestValidateCheckpointOperator:
Expand Down Expand Up @@ -116,10 +116,7 @@ def test_with_cloud_context(

assert result["success"] is True
# make sure we have something that looks like a valid result url
assert (
"https://app.greatexpectations.io/organizations/"
in result["validation_results"][0]["result_url"]
)
assert is_valid_gx_cloud_url(result["validation_results"][0]["result_url"])

def test_with_file_context(
self,
Expand Down
14 changes: 10 additions & 4 deletions tests/integration/test_validate_dataframe_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,35 @@
from great_expectations_provider.operators.validate_dataframe import (
GXValidateDataFrameOperator,
)
from integration.conftest import rand_name
from integration.conftest import is_valid_gx_cloud_url, rand_name


class TestGXValidateDataFrameOperator:
def test_validate_dataframe_with_cloud(
self, ensure_data_source_cleanup: Callable[[str], None]
self,
ensure_data_source_cleanup: Callable[[str], None],
ensure_suite_cleanup: Callable[[str], None],
ensure_validation_definition_cleanup: Callable[[str], None],
) -> None:
# arrange
column_name = "col_A"
task_id = f"test_validate_dataframe_with_cloud_{rand_name()}"
ensure_data_source_cleanup(task_id)

def configure_dataframe() -> pd.DataFrame:
return pd.DataFrame({column_name: ["a", "b", "c"]})

expect = ExpectationSuite(
name="test suite",
name=task_id,
expectations=[
ExpectColumnValuesToBeInSet(
column=column_name,
value_set=["a", "b", "c", "d", "e"], # type: ignore[arg-type]
),
],
)
ensure_data_source_cleanup(task_id)
ensure_suite_cleanup(task_id)
ensure_validation_definition_cleanup(task_id)

validate_df = GXValidateDataFrameOperator(
context_type="cloud",
Expand All @@ -44,3 +49,4 @@ def configure_dataframe() -> pd.DataFrame:

# assert
assert result["success"] is True
assert is_valid_gx_cloud_url(result["result_url"])
21 changes: 21 additions & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import Generator
from unittest.mock import Mock

import pytest
from great_expectations.expectations import Expectation
from pytest_mock import MockerFixture


@pytest.fixture
def mock_gx(mocker: MockerFixture) -> Generator[Mock, None, None]:
"""Due to constraints from Airflow, GX must be imported locally
within the Operator, which makes mocking the GX namespace difficult.
This fixture allows us to globally patch GX.
One known issue with this approach is that isinstance checks fail against
mocks.
"""
mock_gx = Mock()
mock_gx.expectations.Expectation = Expectation # required for isinstance check
mocker.patch.dict("sys.modules", {"great_expectations": mock_gx})
yield mock_gx
Loading

0 comments on commit 081b29c

Please sign in to comment.