Skip to content

Commit

Permalink
feat: filter activity support
Browse files Browse the repository at this point in the history
  • Loading branch information
arjendev committed Nov 24, 2023
1 parent ca4af3c commit 1ba6e7b
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from azure_data_factory_testing_framework.models.activities.activity import Activity
from azure_data_factory_testing_framework.models.activities.execute_pipeline_activity import ExecutePipelineActivity
from azure_data_factory_testing_framework.models.activities.filter_activity import FilterActivity
from azure_data_factory_testing_framework.models.activities.for_each_activity import ForEachActivity
from azure_data_factory_testing_framework.models.activities.if_condition_activity import IfConditionActivity
from azure_data_factory_testing_framework.models.activities.set_variable_activity import SetVariableActivity
Expand Down Expand Up @@ -33,8 +34,9 @@ def _get_activity_from_activity_data(activity_data: dict) -> Activity:
case_value = case["value"]
activities = case["activities"]
cases_activities[case_value] = _get_activity_from_activities_data(activities)

return SwitchActivity(default_activities=default_activities, cases_activities=cases_activities, **activity_data)
elif activity_data["type"] == "Filter":
return FilterActivity(**activity_data)
else:
return Activity(**activity_data)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class FunctionsRepository:
"sub": math_functions.sub,
"div": math_functions.div,
"greaterOrEquals": logical_functions.greater_or_equals,
"lessOrEquals": logical_functions.less_or_equals,
"not": logical_functions.not_,
"empty": collection_functions.empty,
"split": string_functions.split,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import Any

from azure_data_factory_testing_framework.models.activities.control_activity import ControlActivity
from azure_data_factory_testing_framework.models.data_factory_element import DataFactoryElement
from azure_data_factory_testing_framework.state import PipelineRunState
from azure_data_factory_testing_framework.state.dependency_condition import DependencyCondition


class FilterActivity(ControlActivity):
def __init__(
self,
**kwargs: Any, # noqa: ANN401
) -> None:
"""This is the class that represents the If Condition activity in the pipeline.
Args:
if_true_activities: The deserialized activities that will be executed if the condition is true.
if_false_activities: The deserialized activities that will be executed if the condition is false.
**kwargs: FilterActivity properties coming directly from the json representation of the activity.
"""
kwargs["type"] = "Filter"

super(ControlActivity, self).__init__(**kwargs)

self.items: DataFactoryElement = self.type_properties["items"]
self.condition: DataFactoryElement = self.type_properties["condition"]

def evaluate(self, state: PipelineRunState) -> "FilterActivity":
value = []
for item in self.items.evaluate(state):
state.iteration_item = item
if self.condition.evaluate(state):
value.append(item)

self.set_result(DependencyCondition.SUCCEEDED, {"value": value})
state.iteration_item = None

return self
4 changes: 4 additions & 0 deletions tests/functional/filter_activity_pipeline/item.metadata.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "Pipeline",
"displayName": "filter-test"
}
63 changes: 63 additions & 0 deletions tests/functional/filter_activity_pipeline/pipeline-content.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"name": "filter-test",
"properties": {
"activities": [
{
"name": "Filter1",
"type": "Filter",
"dependsOn": [],
"typeProperties": {
"items": {
"value": "@pipeline().parameters.input_values",
"type": "Expression"
},
"condition": {
"value": "@lessOrEquals(item(), 3)",
"type": "Expression"
}
}
},
{
"name": "Set variable1",
"type": "SetVariable",
"dependsOn": [
{
"activity": "Filter1",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"variableName": "filtered_values",
"value": {
"value": "@activity('Filter1').output.value",
"type": "Expression"
}
}
}
],
"parameters": {
"input_values": {
"type": "array",
"defaultValue": [
1,
2,
3,
4,
5
]
}
},
"variables": {
"filtered_values": {
"type": "Array"
}
},
"annotations": []
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import pytest
from azure_data_factory_testing_framework.models.activities.activity import Activity
from azure_data_factory_testing_framework.models.activities.filter_activity import FilterActivity
from azure_data_factory_testing_framework.state import RunParameter, RunParameterType
from azure_data_factory_testing_framework.test_framework import TestFramework, TestFrameworkType


@pytest.mark.parametrize(
"input_values,expected_filtered_values",
[
([1, 2, 3, 4, 5], [1, 2, 3]),
([], []),
([4], []),
([3, 4, 5, 6], [3]),
([4, 5, 6], []),
([-1, 3, 4], [-1, 3]),
],
)
def test_filter_activity(input_values: [], expected_filtered_values: [], request: pytest.FixtureRequest) -> None:
# Arrange
test_framework = TestFramework(
framework_type=TestFrameworkType.Fabric,
root_folder_path=request.fspath.dirname,
should_evaluate_child_pipelines=True,
)
pipeline = test_framework.repository.get_pipeline_by_name("filter-test")

# Act
activities = test_framework.evaluate_pipeline(
pipeline,
[
RunParameter(RunParameterType.Pipeline, "input_values", input_values),
],
)

# Assert
activity: FilterActivity = next(activities)
assert activity.type == "Filter"
assert activity.items.value == input_values
assert activity.output["value"] == expected_filtered_values

activity: Activity = next(activities)
assert activity.type == "SetVariable"
assert activity.type_properties["value"].value == expected_filtered_values
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from azure_data_factory_testing_framework.models.activities.filter_activity import FilterActivity
from azure_data_factory_testing_framework.models.data_factory_element import DataFactoryElement
from azure_data_factory_testing_framework.models.pipeline import Pipeline
from azure_data_factory_testing_framework.state import RunParameter, RunParameterType
from azure_data_factory_testing_framework.test_framework import TestFramework, TestFrameworkType


def test_filter_activity_on_range_of_values() -> None:
# Arrange
test_framework = TestFramework(framework_type=TestFrameworkType.Fabric)
pipeline = Pipeline(
name="pipeline",
parameters={
"input_values": {
"type": "Array",
"defaultValue": [],
},
},
variables={},
activities=[
FilterActivity(
name="FilterActivity",
typeProperties={
"items": DataFactoryElement("@pipeline().parameters.input_values"),
"condition": DataFactoryElement("@lessOrEquals(item(), 3)"),
},
),
],
)

# Act
activities = test_framework.evaluate_pipeline(
pipeline,
[
RunParameter(RunParameterType.Pipeline, "input_values", [1, 2, 3, 4, 5]),
],
)

# Assert
activity = next(activities)
assert activity.type == "Filter"
assert activity.type_properties["items"].value == [1, 2, 3, 4, 5]
assert activity.output["value"] == [1, 2, 3]

0 comments on commit 1ba6e7b

Please sign in to comment.