From 9d1de8345a5266db7f28ab49565c29aff5ed0e68 Mon Sep 17 00:00:00 2001 From: Jacob Snarr Date: Mon, 2 Dec 2024 18:08:13 -0500 Subject: [PATCH] feat(sdk-python): Port in-line expressions to Python SDK (#1166) Ports #1076 and #1164 SDK changes to Python. --- .../littlehorse/sdk/wfsdk/WfRunVariable.java | 2 +- .../sdk/wfsdk/internal/WfRunVariableImpl.java | 34 +-- .../internal/WorkflowThreadImplTest.java | 10 +- sdk-python/examples/basic/example_basic.py | 2 +- sdk-python/littlehorse/workflow.py | 234 +++++++++++++++--- sdk-python/tests/test_utils.py | 2 +- sdk-python/tests/test_workflow.py | 168 +++++++++---- server/src/test/java/e2e/BasicTest.java | 2 +- server/src/test/java/e2e/ExpressionTest.java | 18 +- 9 files changed, 362 insertions(+), 110 deletions(-) diff --git a/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/WfRunVariable.java b/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/WfRunVariable.java index a5a1002a5..7f17f0ef3 100644 --- a/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/WfRunVariable.java +++ b/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/WfRunVariable.java @@ -214,5 +214,5 @@ public interface WfRunVariable extends LHExpression { * provided by the Json Path is mutated. * @param rhs is the value to set this WfRunVariable to. */ - void assignTo(Serializable rhs); + void assign(Serializable rhs); } diff --git a/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/internal/WfRunVariableImpl.java b/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/internal/WfRunVariableImpl.java index 48f6bd5ec..4f408af79 100644 --- a/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/internal/WfRunVariableImpl.java +++ b/sdk-java/src/main/java/io/littlehorse/sdk/wfsdk/internal/WfRunVariableImpl.java @@ -37,9 +37,14 @@ class WfRunVariableImpl implements WfRunVariable { public WfRunVariableImpl(String name, Object typeOrDefaultVal, WorkflowThreadImpl parent) { this.name = name; - this.typeOrDefaultVal = typeOrDefaultVal; this.parent = parent; + if (typeOrDefaultVal == null) { + throw new IllegalArgumentException( + "The 'typeOrDefaultVal' argument must be either a VariableType or a default value, but a null value was provided."); + } + this.typeOrDefaultVal = typeOrDefaultVal; + // As per GH Issue #582, the default is now PRIVATE_VAR. this.accessLevel = WfRunVariableAccessLevel.PRIVATE_VAR; initializeType(); @@ -49,13 +54,8 @@ private void initializeType() { if (typeOrDefaultVal instanceof VariableType) { this.type = (VariableType) typeOrDefaultVal; } else { - try { - this.defaultValue = LHLibUtil.objToVarVal(typeOrDefaultVal); - this.type = LHLibUtil.fromValueCase(defaultValue.getValueCase()); - } catch (LHSerdeError e) { - throw new IllegalArgumentException( - "Was unable to convert provided default value to LH Variable Type", e); - } + setDefaultValue(typeOrDefaultVal); + this.type = LHLibUtil.fromValueCase(defaultValue.getValueCase()); } } @@ -98,15 +98,21 @@ public WfRunVariable required() { @Override public WfRunVariable withDefault(Object defaultVal) { + setDefaultValue(defaultVal); + + if (!LHLibUtil.fromValueCase(defaultValue.getValueCase()).equals(type)) { + throw new IllegalArgumentException("Default value type does not match LH variable type " + type); + } + + return this; + } + + private void setDefaultValue(Object defaultVal) { try { - VariableValue attempt = LHLibUtil.objToVarVal(defaultVal); - if (!LHLibUtil.fromValueCase(attempt.getValueCase()).equals(type)) { - throw new IllegalArgumentException("Default value type does not match variable type"); - } + this.defaultValue = LHLibUtil.objToVarVal(defaultVal); } catch (LHSerdeError e) { throw new IllegalArgumentException("Was unable to convert provided default value to LH Variable Type", e); } - return this; } @Override @@ -175,7 +181,7 @@ public WorkflowConditionImpl isNotIn(Serializable rhs) { } @Override - public void assignTo(Serializable rhs) { + public void assign(Serializable rhs) { parent.mutate(this, VariableMutationType.ASSIGN, rhs); } diff --git a/sdk-java/src/test/java/io/littlehorse/sdk/wfsdk/internal/WorkflowThreadImplTest.java b/sdk-java/src/test/java/io/littlehorse/sdk/wfsdk/internal/WorkflowThreadImplTest.java index 681c6a666..2a8aa238e 100644 --- a/sdk-java/src/test/java/io/littlehorse/sdk/wfsdk/internal/WorkflowThreadImplTest.java +++ b/sdk-java/src/test/java/io/littlehorse/sdk/wfsdk/internal/WorkflowThreadImplTest.java @@ -550,7 +550,7 @@ void mutationsShouldUseVariableAssignment() { // Deprecated the literal_value and node_output approach Workflow workflow = new WorkflowImpl("obiwan", wf -> { WfRunVariable myVar = wf.addVariable("my-var", VariableType.STR); - myVar.assignTo("some-value"); + myVar.assign("some-value"); }); PutWfSpecRequest wfSpec = workflow.compileWorkflow(); @@ -570,7 +570,7 @@ void nodeOutputMutationsShouldAlsoUseVariableAssignments() { // Deprecated the literal_value and node_output approach Workflow workflow = new WorkflowImpl("obiwan", wf -> { WfRunVariable myVar = wf.addVariable("my-var", VariableType.STR); - myVar.assignTo(wf.execute("use-the-force")); + myVar.assign(wf.execute("use-the-force")); }); PutWfSpecRequest wfSpec = workflow.compileWorkflow(); @@ -590,7 +590,7 @@ void nodeOutputMutationsShouldCarryJsonPath() { // Deprecated the literal_value and node_output approach Workflow workflow = new WorkflowImpl("obiwan", wf -> { WfRunVariable myVar = wf.addVariable("my-var", VariableType.STR); - myVar.assignTo(wf.execute("use-the-force").jsonPath("$.hello.there")); + myVar.assign(wf.execute("use-the-force").jsonPath("$.hello.there")); }); PutWfSpecRequest wfSpec = workflow.compileWorkflow(); @@ -614,7 +614,7 @@ void assigningVariablesToOtherVariablesShouldUseVariableAssignment() { Workflow workflow = new WorkflowImpl("obiwan", wf -> { WfRunVariable myVar = wf.addVariable("my-var", VariableType.STR); WfRunVariable otherVar = wf.addVariable("other-var", VariableType.STR); - myVar.assignTo(otherVar); + myVar.assign(otherVar); }); PutWfSpecRequest wfSpec = workflow.compileWorkflow(); @@ -632,7 +632,7 @@ void assigningVariablesToOtherVariablesShouldCarryJsonPath() { Workflow workflow = new WorkflowImpl("obiwan", wf -> { WfRunVariable myVar = wf.addVariable("my-var", VariableType.STR); WfRunVariable otherVar = wf.addVariable("other-var", VariableType.JSON_OBJ); - myVar.assignTo(otherVar.jsonPath("$.hello.there")); + myVar.assign(otherVar.jsonPath("$.hello.there")); }); PutWfSpecRequest wfSpec = workflow.compileWorkflow(); diff --git a/sdk-python/examples/basic/example_basic.py b/sdk-python/examples/basic/example_basic.py index bed79022f..fe349646c 100644 --- a/sdk-python/examples/basic/example_basic.py +++ b/sdk-python/examples/basic/example_basic.py @@ -46,4 +46,4 @@ async def main() -> None: if __name__ == "__main__": - asyncio.run(main()) + asyncio.run(main()) \ No newline at end of file diff --git a/sdk-python/littlehorse/workflow.py b/sdk-python/littlehorse/workflow.py index 13b54058a..74d6e72d9 100644 --- a/sdk-python/littlehorse/workflow.py +++ b/sdk-python/littlehorse/workflow.py @@ -130,12 +130,62 @@ def to_variable_assignment(value: Any) -> VariableAssignment: json_path=json_path, variable_name=variable_name, ) + + if isinstance(value, LHExpression): + expression: LHExpression = value + return VariableAssignment( + expression=VariableAssignment.Expression( + lhs=to_variable_assignment(expression.lhs()), + operation=expression.operation(), + rhs=to_variable_assignment(expression.rhs())) + ) return VariableAssignment( literal_value=to_variable_value(value), ) +class LHExpression: + def __init__(self, lhs: Any, operation: VariableMutationType, rhs: Any) -> None: + self._lhs = lhs + self._operation = operation + self._rhs = rhs + + def add(self, other: Any) -> LHExpression: + return LHExpression(self, VariableMutationType.ADD, other) + + def subtract(self, other: Any) -> LHExpression: + return LHExpression(self, VariableMutationType.SUBTRACT, other) + + def multiply(self, other: Any) -> LHExpression: + return LHExpression(self, VariableMutationType.MULTIPLY, other) + + def divide(self, other: Any) -> LHExpression: + return LHExpression(self, VariableMutationType.DIVIDE, other) + + def extend(self, other: Any) -> LHExpression: + return LHExpression(self, VariableMutationType.EXTEND, other) + + def remove_if_present(self, other: Any) -> LHExpression: + return LHExpression(self, VariableMutationType.REMOVE_IF_PRESENT, other) + + def remove_index(self, index: Optional[Union[int, Any]] = None) -> LHExpression: + if index is None: + raise ValueError("Expected 'index' to be set, but it was None.") + return LHExpression(self, VariableMutationType.REMOVE_INDEX, index) + + def remove_key(self, other: Any) -> LHExpression: + return LHExpression(self, VariableMutationType.REMOVE_KEY, other) + + def lhs(self) -> Any: + return self._lhs + + def rhs(self) -> Any: + return self._rhs + + def operation(self) -> Any: + return self._operation + class WorkflowCondition: def __init__(self, left_hand: Any, comparator: Comparator, right_hand: Any) -> None: """Returns a WorkflowCondition that can be used in @@ -246,7 +296,7 @@ def __init__(self, format: str, *args: Any) -> None: self._args = args -class NodeOutput: +class NodeOutput(LHExpression): def __init__(self, node_name: str) -> None: self.node_name = node_name self._json_path: Optional[str] = None @@ -284,6 +334,32 @@ def with_json_path(self, json_path: str) -> "NodeOutput": out = NodeOutput(self.node_name) out.json_path = json_path return out + + def add(self, other: Any) -> LHExpression: + return LHExpression(self, VariableMutationType.ADD, other) + + def subtract(self, other: Any) -> LHExpression: + return LHExpression(self, VariableMutationType.SUBTRACT, other) + + def multiply(self, other: Any) -> LHExpression: + return LHExpression(self, VariableMutationType.MULTIPLY, other) + + def divide(self, other: Any) -> LHExpression: + return LHExpression(self, VariableMutationType.DIVIDE, other) + + def extend(self, other: Any) -> LHExpression: + return LHExpression(self, VariableMutationType.EXTEND, other) + + def remove_if_present(self, other: Any) -> LHExpression: + return LHExpression(self, VariableMutationType.REMOVE_IF_PRESENT, other) + + def remove_index(self, index: Optional[Union[int, Any]] = None) -> LHExpression: + if index is None: + raise ValueError("Expected 'index' to be set, but it was None.") + return LHExpression(self, VariableMutationType.REMOVE_INDEX, index) + + def remove_key(self, other: Any) -> LHExpression: + return LHExpression(self, VariableMutationType.REMOVE_KEY, other) class WfRunVariable: @@ -291,18 +367,20 @@ def __init__( self, variable_name: str, variable_type: VariableType, + parent: WorkflowThread, default_value: Any = None, access_level: Optional[ Union[WfRunVariableAccessLevel, str] - ] = WfRunVariableAccessLevel.PUBLIC_VAR, + ] = WfRunVariableAccessLevel.PRIVATE_VAR, ) -> None: """Defines a Variable in the ThreadSpec and returns a handle to it. Args: variable_name (str): The name of the variable. variable_type (VariableType): The variable type. - access_level (WfRunVariableAccessLevel): Sets the access level of a WfRunVariable. + parent (WorkflowThread): The parent WorkflowThread of this WfRunVariable. default_value (Any, optional): A default value. Defaults to None. + access_level (WfRunVariableAccessLevel): Sets the access level of a WfRunVariable. Defaults to PRIVATE_VAR. Returns: WfRunVariable: A handle to the created WfRunVariable. @@ -311,7 +389,8 @@ def __init__( TypeError: If variable_type and type(default_value) are not compatible. """ self.name = variable_name - self.type = variable_type + self.type = variable_type + self.parent = parent self.default_value: Optional[VariableValue] = None self._json_path: Optional[str] = None self._required = False @@ -321,14 +400,7 @@ def __init__( self._access_level = access_level if default_value is not None: - self.default_value = to_variable_value(default_value) - if ( - self.default_value.WhichOneof("value") - != str(VariableType.Name(self.type)).lower() - ): - raise TypeError( - f"Default value is not a {VariableType.Name(variable_type)}" - ) + self._set_default(default_value) @property def json_path(self) -> Optional[str]: @@ -368,7 +440,7 @@ def with_json_path(self, json_path: str) -> "WfRunVariable": f"JsonPath not allowed in a {VariableType.Name(self.type)} variable" ) - out = WfRunVariable(self.name, self.type, self.default_value) + out = WfRunVariable(self.name, self.type, self.parent, self.default_value) out.json_path = json_path return out @@ -445,6 +517,21 @@ def searchable_on( def required(self) -> "WfRunVariable": self._required = True return self + + def with_default(self, default_value: Any) -> WfRunVariable: + self._set_default(default_value) + + return self + + def _set_default(self, default_value: Any) -> None: + self.default_value = to_variable_value(default_value) + if ( + self.default_value.WhichOneof("value") + != str(VariableType.Name(self.type)).lower() + ): + raise TypeError( + f"Default value type does not match LH variable type {VariableType.Name(self.type)}" + ) def masked(self) -> "WfRunVariable": self._masked = True @@ -469,6 +556,65 @@ def compile(self) -> ThreadVarDef: access_level=self._access_level, ) + def is_equal_to(self, rhs: Any) -> WorkflowCondition: + return self.parent.condition(self, Comparator.EQUALS, rhs) + + def is_not_equal_to(self, rhs: Any) -> WorkflowCondition: + return self.parent.condition(self, Comparator.NOT_EQUALS, rhs) + + def is_greater_than(self, rhs: Any) -> WorkflowCondition: + return self.parent.condition(self, Comparator.GREATER_THAN, rhs) + + def is_greater_than_eq(self, rhs: Any) -> WorkflowCondition: + return self.parent.condition(self, Comparator.GREATER_THAN_EQ, rhs) + + def is_less_than_eq(self, rhs: Any) -> WorkflowCondition: + return self.parent.condition(self, Comparator.LESS_THAN_EQ, rhs) + + def is_less_than(self, rhs: Any) -> WorkflowCondition: + return self.parent.condition(self, Comparator.LESS_THAN, rhs) + + def does_contain(self, rhs: Any) -> WorkflowCondition: + return self.parent.condition(self, Comparator.IN, rhs) + + def does_not_contain(self, rhs: Any) -> WorkflowCondition: + return self.parent.condition(self, Comparator.NOT_IN, rhs) + + def is_in(self, rhs: Any) -> WorkflowCondition: + return self.parent.condition(self, Comparator.IN, rhs) + + def is_not_in(self, rhs: Any) -> WorkflowCondition: + return self.parent.condition(self, Comparator.NOT_IN, rhs) + + def assign(self, rhs: Any) -> None: + self.parent.mutate(self, VariableMutationType.ASSIGN, rhs) + + def add(self, other: Any) -> LHExpression: + return LHExpression(self, VariableMutationType.ADD, other) + + def subtract(self, other: Any) -> LHExpression: + return LHExpression(self, VariableMutationType.SUBTRACT, other) + + def multiply(self, other: Any) -> LHExpression: + return LHExpression(self, VariableMutationType.MULTIPLY, other) + + def divide(self, other: Any) -> LHExpression: + return LHExpression(self, VariableMutationType.DIVIDE, other) + + def extend(self, other: Any) -> LHExpression: + return LHExpression(self, VariableMutationType.EXTEND, other) + + def remove_if_present(self, other: Any) -> LHExpression: + return LHExpression(self, VariableMutationType.REMOVE_IF_PRESENT, other) + + def remove_index(self, index: Optional[Union[int, Any]] = None) -> LHExpression: + if index is None: + raise ValueError("Expected 'index' to be set, but it was None.") + return LHExpression(self, VariableMutationType.REMOVE_INDEX, index) + + def remove_key(self, key: Any) -> LHExpression: + return LHExpression(self, VariableMutationType.REMOVE_KEY, key) + def __str__(self) -> str: return to_json(self.compile()) @@ -1129,6 +1275,50 @@ def execute( node_name = self.add_node(readable_name, task_node) return NodeOutput(node_name) + def multiply(self, lhs: Any, rhs: Any) -> LHExpression: + return LHExpression(lhs, VariableMutationType.MULTIPLY, rhs) + + def add(self, lhs: Any, rhs: Any) -> LHExpression: + return LHExpression(lhs, VariableMutationType.ADD, rhs) + + def divide(self, lhs: Any, rhs: Any) -> LHExpression: + return LHExpression(lhs, VariableMutationType.DIVIDE, rhs) + + def subtract(self, lhs: Any, rhs: Any) -> LHExpression: + return LHExpression(lhs, VariableMutationType.SUBTRACT, rhs) + + def extend(self, lhs: Any, rhs: Any) -> LHExpression: + return LHExpression(lhs, VariableMutationType.EXTEND, rhs) + + def remove_if_present(self, lhs: Any, rhs: Any) -> LHExpression: + return LHExpression(lhs, VariableMutationType.REMOVE_IF_PRESENT, rhs) + + def remove_index(self, index: Optional[Union[int, Any]] = None) -> LHExpression: + if index is None: + raise ValueError("Expected 'index' to be set, but it was None.") + return LHExpression(self, VariableMutationType.REMOVE_INDEX, index) + + def remove_key(self, lhs: Any, rhs: Any) -> LHExpression: + return LHExpression(lhs, VariableMutationType.REMOVE_KEY, rhs) + + def declare_bool(self, name: str) -> WfRunVariable: + return self.add_variable(name, VariableType.BOOL) + + def declare_int(self, name: str) -> WfRunVariable: + return self.add_variable(name, VariableType.INT) + + def declare_double(self, name: str) -> WfRunVariable: + return self.add_variable(name, VariableType.DOUBLE) + + def declare_bytes(self, name: str) -> WfRunVariable: + return self.add_variable(name, VariableType.BYTES) + + def declare_json_arr(self, name: str) -> WfRunVariable: + return self.add_variable(name, VariableType.JSON_ARR) + + def declare_json_obj(self, name: str) -> WfRunVariable: + return self.add_variable(name, VariableType.JSON_OBJ) + def handle_any_failure( self, node: NodeOutput, initializer: "ThreadInitializer" ) -> None: @@ -1458,22 +1648,9 @@ def mutate( use the output of a Node Run to mutate variables). """ self._check_if_active() - last_node = self._last_node() - node_output: Optional[VariableMutation.NodeOutputSource] = None - rhs_assignment: Optional[VariableAssignment] = None literal_value: Optional[VariableValue] = None - - if isinstance(right_hand, NodeOutput): - if last_node.name != right_hand.node_name: - raise ReferenceError("NodeOutput does not match with last node") - node_output = VariableMutation.NodeOutputSource( - jsonpath=right_hand.json_path - ) - elif isinstance(right_hand, WfRunVariable): - rhs_assignment = to_variable_assignment(right_hand) - else: - literal_value = to_variable_value(right_hand) + rhs_assignment = to_variable_assignment(right_hand) mutation = VariableMutation( lhs_name=left_hand.name, @@ -1524,7 +1701,7 @@ def add_variable( raise ValueError(f"Variable {variable_name} already added") new_var = WfRunVariable( - variable_name, variable_type, default_value, access_level + variable_name, variable_type, self, default_value, access_level ) self._wf_run_variables.append(new_var) return new_var @@ -1934,7 +2111,6 @@ def with_task_timeout_seconds( self._default_timeout_seconds = timeout_seconds return self - def create_workflow_spec( workflow: Workflow, config: LHConfig, timeout: Optional[int] = None ) -> None: diff --git a/sdk-python/tests/test_utils.py b/sdk-python/tests/test_utils.py index 49d7aa511..9953eca08 100644 --- a/sdk-python/tests/test_utils.py +++ b/sdk-python/tests/test_utils.py @@ -228,7 +228,7 @@ def test_parse_assignment_variable(self): # a WfRunVariable wf_run_variable = WfRunVariable( - variable_name="my-var-name", variable_type=VariableType.STR + variable_name="my-var-name", variable_type=VariableType.STR, parent=None ) wf_run_variable.json_path = "$.myPath" variable = to_variable_assignment(wf_run_variable) diff --git a/sdk-python/tests/test_workflow.py b/sdk-python/tests/test_workflow.py index 6f3c1c59c..1604efd22 100644 --- a/sdk-python/tests/test_workflow.py +++ b/sdk-python/tests/test_workflow.py @@ -74,23 +74,23 @@ def test_validate_json_path_format(self): class TestWfRunVariable(unittest.TestCase): def test_value_is_not_none(self): - variable = WfRunVariable("my-var", VariableType.STR, default_value="my-str") + variable = WfRunVariable("my-var", VariableType.STR, None, default_value="my-str") self.assertEqual(variable.default_value.WhichOneof("value"), "str") self.assertEqual(variable.default_value.str, "my-str") - variable = WfRunVariable("my-var", VariableType.STR) + variable = WfRunVariable("my-var", VariableType.STR, None) self.assertEqual(variable.default_value, None) def test_validate_are_same_type(self): with self.assertRaises(TypeError) as exception_context: - WfRunVariable("my-var", VariableType.STR, 10) + WfRunVariable("my-var", VariableType.STR, None, 10) self.assertEqual( - "Default value is not a STR", + "Default value type does not match LH variable type STR", str(exception_context.exception), ) def test_validate_with_json_path_already_set(self): - variable = WfRunVariable("my-var", VariableType.STR) + variable = WfRunVariable("my-var", VariableType.STR, None) variable.json_path = "$.myPath" with self.assertRaises(ValueError) as exception_context: variable.with_json_path("$.myNewOne") @@ -100,7 +100,7 @@ def test_validate_with_json_path_already_set(self): ) def test_validate_json_path_already_set(self): - variable = WfRunVariable("my-var", VariableType.STR) + variable = WfRunVariable("my-var", VariableType.STR, None) variable.json_path = "$.myPath" with self.assertRaises(ValueError) as exception_context: variable.json_path = "$.myNewOne" @@ -110,7 +110,7 @@ def test_validate_json_path_already_set(self): ) def test_validate_json_path_format(self): - variable = WfRunVariable("my-var", VariableType.STR) + variable = WfRunVariable("my-var", VariableType.STR, None) with self.assertRaises(ValueError) as exception_context: variable.json_path = "$myNewOne" self.assertEqual( @@ -119,7 +119,7 @@ def test_validate_json_path_format(self): ) def test_validate_is_json_obj_when_using_json_index(self): - variable = WfRunVariable("my-var", VariableType.STR) + variable = WfRunVariable("my-var", VariableType.STR, None) with self.assertRaises(ValueError) as exception_context: variable.searchable_on("$.myPath", VariableType.STR) self.assertEqual( @@ -128,11 +128,11 @@ def test_validate_is_json_obj_when_using_json_index(self): ) def test_persistent(self): - variable = WfRunVariable("my-var", VariableType.STR).searchable() + variable = WfRunVariable("my-var", VariableType.STR, None).searchable() self.assertEqual(variable.compile().searchable, True) def test_validate_is_json_obj_when_using_json_pth(self): - variable = WfRunVariable("my-var", VariableType.STR) + variable = WfRunVariable("my-var", VariableType.STR, None) with self.assertRaises(ValueError) as exception_context: variable.with_json_path("$.myPath") self.assertEqual( @@ -140,29 +140,29 @@ def test_validate_is_json_obj_when_using_json_pth(self): str(exception_context.exception), ) - variable = WfRunVariable("my-var", VariableType.JSON_OBJ) + variable = WfRunVariable("my-var", VariableType.JSON_OBJ, None) variable.with_json_path("$.myPath") - variable = WfRunVariable("my-var", VariableType.JSON_ARR) + variable = WfRunVariable("my-var", VariableType.JSON_ARR, None) variable.with_json_path("$.myPath") def test_json_path_creates_new(self): - variable = WfRunVariable("my-var", VariableType.JSON_ARR) + variable = WfRunVariable("my-var", VariableType.JSON_ARR, None) with_json = variable.with_json_path("$.myPath") self.assertIsNot(variable, with_json) def test_compile_variable(self): - variable = WfRunVariable("my-var", VariableType.STR) + variable = WfRunVariable("my-var", VariableType.STR, None) self.assertEqual( variable.compile(), - ThreadVarDef(var_def=VariableDef(name="my-var", type=VariableType.STR)), + ThreadVarDef(var_def=VariableDef(name="my-var", type=VariableType.STR), access_level=WfRunVariableAccessLevel.PRIVATE_VAR), ) - variable = WfRunVariable("my-var", VariableType.JSON_OBJ) + variable = WfRunVariable("my-var", VariableType.JSON_OBJ, None) variable.searchable_on("$.myPath", VariableType.STR) expected_output = ThreadVarDef( var_def=VariableDef(name="my-var", type=VariableType.JSON_OBJ), - access_level="PUBLIC_VAR", + access_level="PRIVATE_VAR", ) expected_output.json_indexes.append( JsonIndex(field_path="$.myPath", field_type=VariableType.STR) @@ -170,7 +170,7 @@ def test_compile_variable(self): self.assertEqual(variable.compile(), expected_output) def test_compile_private_variable(self): - variable = WfRunVariable("my-var", VariableType.STR, access_level="PRIVATE_VAR") + variable = WfRunVariable("my-var", VariableType.STR, None, access_level="PRIVATE_VAR") expected_output = ThreadVarDef( var_def=VariableDef(name="my-var", type=VariableType.STR), access_level="PRIVATE_VAR", @@ -178,7 +178,7 @@ def test_compile_private_variable(self): self.assertEqual(variable.compile(), expected_output) def test_compile_inherited_variable(self): - variable = WfRunVariable("my-var", VariableType.STR) + variable = WfRunVariable("my-var", VariableType.STR, None) variable.with_access_level(WfRunVariableAccessLevel.INHERITED_VAR) expected_output = ThreadVarDef( var_def=VariableDef(name="my-var", type=VariableType.STR), @@ -286,7 +286,7 @@ class MyClass: def if_condition(self, thread: WorkflowThread) -> None: thread.mutate( WfRunVariable( - variable_name="variable-1", variable_type=VariableType.INT + variable_name="variable-1", variable_type=VariableType.INT, parent=thread ), VariableMutationType.ASSIGN, 1, @@ -294,7 +294,7 @@ def if_condition(self, thread: WorkflowThread) -> None: thread.execute("task-a") thread.mutate( WfRunVariable( - variable_name="variable-3", variable_type=VariableType.INT + variable_name="variable-3", variable_type=VariableType.INT, parent=thread ), VariableMutationType.ASSIGN, 3, @@ -304,7 +304,7 @@ def if_condition(self, thread: WorkflowThread) -> None: def else_condition(self, thread: WorkflowThread) -> None: thread.mutate( WfRunVariable( - variable_name="variable-2", variable_type=VariableType.INT + variable_name="variable-2", variable_type=VariableType.INT, parent=thread ), VariableMutationType.ASSIGN, 2, @@ -313,7 +313,7 @@ def else_condition(self, thread: WorkflowThread) -> None: thread.execute("task-d") thread.mutate( WfRunVariable( - variable_name="variable-4", variable_type=VariableType.INT + variable_name="variable-4", variable_type=VariableType.INT, parent=thread ), VariableMutationType.ASSIGN, 4, @@ -359,7 +359,7 @@ def to_thread(self): VariableMutation( lhs_name="variable-1", operation=VariableMutationType.ASSIGN, - literal_value=VariableValue(int=1), + rhs_assignment=VariableAssignment(literal_value=VariableValue(int=1)), ) ], ), @@ -378,7 +378,7 @@ def to_thread(self): VariableMutation( lhs_name="variable-2", operation=VariableMutationType.ASSIGN, - literal_value=VariableValue(int=2), + rhs_assignment=VariableAssignment(literal_value=VariableValue(int=2)), ) ], ), @@ -393,7 +393,7 @@ def to_thread(self): VariableMutation( lhs_name="variable-3", operation=VariableMutationType.ASSIGN, - literal_value=VariableValue(int=3), + rhs_assignment=VariableAssignment(literal_value=VariableValue(int=3)), ) ], ) @@ -416,7 +416,7 @@ def to_thread(self): VariableMutation( lhs_name="variable-4", operation=VariableMutationType.ASSIGN, - literal_value=VariableValue(int=4), + rhs_assignment=VariableAssignment(literal_value=VariableValue(int=4)), ) ], ) @@ -436,7 +436,7 @@ class MyClass: def if_condition(self, thread: WorkflowThread) -> None: thread.mutate( WfRunVariable( - variable_name="variable-1", variable_type=VariableType.INT + variable_name="variable-1", variable_type=VariableType.INT, parent=thread ), VariableMutationType.ASSIGN, 1, @@ -445,7 +445,7 @@ def if_condition(self, thread: WorkflowThread) -> None: def else_condition(self, thread: WorkflowThread) -> None: thread.mutate( WfRunVariable( - variable_name="variable-2", variable_type=VariableType.INT + variable_name="variable-2", variable_type=VariableType.INT, parent=thread ), VariableMutationType.ASSIGN, 2, @@ -491,7 +491,7 @@ def to_thread(self): VariableMutation( lhs_name="variable-2", operation=VariableMutationType.ASSIGN, - literal_value=VariableValue(int=2), + rhs_assignment=VariableAssignment(literal_value=VariableValue(int=2)), ) ], ), @@ -510,7 +510,7 @@ def to_thread(self): VariableMutation( lhs_name="variable-1", operation=VariableMutationType.ASSIGN, - literal_value=VariableValue(int=1), + rhs_assignment=VariableAssignment(literal_value=VariableValue(int=1)), ) ], ), @@ -530,7 +530,7 @@ class MyClass: def if_condition(self, thread: WorkflowThread) -> None: thread.mutate( WfRunVariable( - variable_name="variable-2", variable_type=VariableType.INT + variable_name="variable-2", variable_type=VariableType.INT, parent=thread ), VariableMutationType.ASSIGN, 2, @@ -539,7 +539,7 @@ def if_condition(self, thread: WorkflowThread) -> None: def my_entrypoint(self, thread: WorkflowThread) -> None: thread.mutate( WfRunVariable( - variable_name="variable-1", variable_type=VariableType.INT + variable_name="variable-1", variable_type=VariableType.INT, parent=thread ), VariableMutationType.ASSIGN, 1, @@ -549,7 +549,7 @@ def my_entrypoint(self, thread: WorkflowThread) -> None: ) thread.mutate( WfRunVariable( - variable_name="variable-3", variable_type=VariableType.INT + variable_name="variable-3", variable_type=VariableType.INT, parent=thread ), VariableMutationType.ASSIGN, 3, @@ -575,7 +575,7 @@ def to_thread(self): VariableMutation( lhs_name="variable-1", operation=VariableMutationType.ASSIGN, - literal_value=VariableValue(int=1), + rhs_assignment=VariableAssignment(literal_value=VariableValue(int=1)), ) ], ) @@ -599,7 +599,7 @@ def to_thread(self): VariableMutation( lhs_name="variable-2", operation=VariableMutationType.ASSIGN, - literal_value=VariableValue(int=2), + rhs_assignment=VariableAssignment(literal_value=VariableValue(int=2)), ) ], ), @@ -626,7 +626,7 @@ def to_thread(self): VariableMutation( lhs_name="variable-3", operation=VariableMutationType.ASSIGN, - literal_value=VariableValue(int=3), + rhs_assignment=VariableAssignment(literal_value=VariableValue(int=3)), ) ], ) @@ -642,7 +642,7 @@ class MyClass: def my_condition(self, thread: WorkflowThread) -> None: thread.mutate( WfRunVariable( - variable_name="variable-1", variable_type=VariableType.INT + variable_name="variable-1", variable_type=VariableType.INT, parent=thread ), VariableMutationType.ASSIGN, 1, @@ -650,7 +650,7 @@ def my_condition(self, thread: WorkflowThread) -> None: thread.execute("my-task") thread.mutate( WfRunVariable( - variable_name="variable-2", variable_type=VariableType.INT + variable_name="variable-2", variable_type=VariableType.INT, parent=thread ), VariableMutationType.ASSIGN, 2, @@ -694,7 +694,7 @@ def to_thread(self): VariableMutation( lhs_name="variable-1", operation=VariableMutationType.ASSIGN, - literal_value=VariableValue(int=1), + rhs_assignment=VariableAssignment(literal_value=VariableValue(int=1)), ) ], ), @@ -721,7 +721,7 @@ def to_thread(self): VariableMutation( lhs_name="variable-2", operation=VariableMutationType.ASSIGN, - literal_value=VariableValue(int=2), + rhs_assignment=VariableAssignment(literal_value=VariableValue(int=2)), ) ], ) @@ -741,7 +741,7 @@ class MyClass: def my_condition(self, thread: WorkflowThread) -> None: thread.mutate( WfRunVariable( - variable_name="variable-1", variable_type=VariableType.INT + variable_name="variable-1", variable_type=VariableType.INT, parent=thread ), VariableMutationType.ASSIGN, 1, @@ -749,7 +749,7 @@ def my_condition(self, thread: WorkflowThread) -> None: thread.execute("my-task") thread.mutate( WfRunVariable( - variable_name="variable-2", variable_type=VariableType.INT + variable_name="variable-2", variable_type=VariableType.INT, parent=thread ), VariableMutationType.ASSIGN, 2, @@ -793,7 +793,7 @@ def to_thread(self): VariableMutation( lhs_name="variable-1", operation=VariableMutationType.ASSIGN, - literal_value=VariableValue(int=1), + rhs_assignment=VariableAssignment(literal_value=VariableValue(int=1)), ) ], ), @@ -820,7 +820,7 @@ def to_thread(self): VariableMutation( lhs_name="variable-2", operation=VariableMutationType.ASSIGN, - literal_value=VariableValue(int=2), + rhs_assignment=VariableAssignment(literal_value=VariableValue(int=2)), ) ], ) @@ -1061,7 +1061,7 @@ def my_entrypoint(thread: WorkflowThread) -> None: VariableMutation( lhs_name="value", operation=VariableMutationType.MULTIPLY, - literal_value=VariableValue(int=2), + rhs_assignment=VariableAssignment(literal_value=VariableValue(int=2)), ) ], ) @@ -1079,6 +1079,76 @@ def my_entrypoint(thread: WorkflowThread) -> None: ), ) + def test_mutations_should_use_variable_assignments(self): + def my_entrypoint(thread: WorkflowThread) -> None: + my_var = thread.add_variable("my-var", VariableType.STR) + my_var.assign("some-value") + + wf_spec = Workflow("obiwan", my_entrypoint).compile() + entrypoint = wf_spec.thread_specs[wf_spec.entrypoint_thread_name] + node = entrypoint.nodes["0-entrypoint-ENTRYPOINT"] + + edge = node.outgoing_edges[0] + + self.assertEqual(edge.variable_mutations[0].rhs_assignment.literal_value.str, "some-value") + + def test_node_output_mutations_should_also_use_variable_assignments(self): + def my_entrypoint(thread: WorkflowThread) -> None: + my_var = thread.add_variable("my-var", VariableType.STR) + my_var.assign(thread.execute("use-the-force")) + + wf_spec = Workflow("obiwan", my_entrypoint).compile() + entrypoint = wf_spec.thread_specs[wf_spec.entrypoint_thread_name] + node = entrypoint.nodes["1-use-the-force-TASK"] + + edge = node.outgoing_edges[0] + + self.assertEqual(edge.variable_mutations[0].rhs_assignment.node_output.node_name, "1-use-the-force-TASK") + + def test_node_output_mutations_should_carry_json_path(self): + def my_entrypoint(thread: WorkflowThread) -> None: + my_var = thread.add_variable("my-var", VariableType.STR) + my_var.assign(thread.execute("use-the-force").with_json_path("$.hello.there")) + + wfSpec = Workflow("obiwan", my_entrypoint).compile() + entrypoint = wfSpec.thread_specs[wfSpec.entrypoint_thread_name] + node = entrypoint.nodes["1-use-the-force-TASK"] + + edge = node.outgoing_edges[0] + + self.assertEqual(edge.variable_mutations[0].rhs_assignment.node_output.node_name, "1-use-the-force-TASK") + + self.assertEqual(edge.variable_mutations[0].rhs_assignment.json_path, "$.hello.there") + + def test_assigning_variables_to_other_variables_should_use_variable_assignment(self): + def my_entrypoint(thread: WorkflowThread) -> None: + my_var = thread.add_variable("my-var", VariableType.STR) + other_var = thread.add_variable("other-var", VariableType.STR) + my_var.assign(other_var) + + wfSpec = Workflow("obiwan", my_entrypoint).compile() + entrypoint = wfSpec.thread_specs[wfSpec.entrypoint_thread_name] + node = entrypoint.nodes["0-entrypoint-ENTRYPOINT"] + + edge = node.outgoing_edges[0] + self.assertEqual(edge.variable_mutations[0].rhs_assignment.variable_name, "other-var") + + def test_assigning_variables_to_other_variables_should_carry_json_path(self): + def my_entrypoint(thread: WorkflowThread) -> None: + my_var = thread.add_variable("my-var", VariableType.STR) + other_var = thread.add_variable("other-var", VariableType.JSON_OBJ) + my_var.assign(other_var.with_json_path("$.hello.there")) + + wfSpec = Workflow("obiwan", my_entrypoint).compile() + entrypoint = wfSpec.thread_specs[wfSpec.entrypoint_thread_name] + node = entrypoint.nodes["0-entrypoint-ENTRYPOINT"] + + edge = node.outgoing_edges[0] + self.assertEqual(edge.variable_mutations[0].rhs_assignment.variable_name, "other-var") + + self.assertEqual(edge.variable_mutations[0].rhs_assignment.json_path, "$.hello.there") + + class TestWorkflow(unittest.TestCase): def test_entrypoint_is_a_function(self): @@ -2006,7 +2076,7 @@ def wf_func(thread: WorkflowThread) -> None: mutation = mutations[0] self.assertEqual("my-var", mutation.lhs_name) self.assertEqual(VariableMutationType.ASSIGN, mutation.operation) - self.assertTrue(mutation.HasField("node_output")) + self.assertTrue(mutation.HasField("rhs_assignment")) def test_assign_to_variable_user_id(self): def wf_func(thread: WorkflowThread) -> None: @@ -2171,7 +2241,7 @@ def wf_func(thread: WorkflowThread) -> None: def test_reassign_to_user_var(self): def wf_func(thread: WorkflowThread) -> None: - user_var = WfRunVariable("my-var", VariableType.STR) + user_var = WfRunVariable("my-var", VariableType.STR, thread) uto = thread.assign_user_task( "my-user-task", user_id="asdf", @@ -2197,7 +2267,7 @@ def wf_func(thread: WorkflowThread) -> None: def test_reassign_to_group(self): def wf_func(thread: WorkflowThread) -> None: - user_var = WfRunVariable("my-var", VariableType.STR) + user_var = WfRunVariable("my-var", VariableType.STR, thread) uto = thread.assign_user_task( "my-user-task", user_id="asdf", diff --git a/server/src/test/java/e2e/BasicTest.java b/server/src/test/java/e2e/BasicTest.java index 094d1ab96..6e55ff439 100644 --- a/server/src/test/java/e2e/BasicTest.java +++ b/server/src/test/java/e2e/BasicTest.java @@ -27,7 +27,7 @@ public void shouldDoBasic() { public Workflow getBasic() { return new WorkflowImpl("test-basic", thread -> { WfRunVariable asdf = thread.declareBool("asdf"); - asdf.assignTo(thread.execute("ag-one")); + asdf.assign(thread.execute("ag-one")); }); } diff --git a/server/src/test/java/e2e/ExpressionTest.java b/server/src/test/java/e2e/ExpressionTest.java index 13f5b5117..c9c4962a1 100644 --- a/server/src/test/java/e2e/ExpressionTest.java +++ b/server/src/test/java/e2e/ExpressionTest.java @@ -247,7 +247,7 @@ public Workflow getExpression() { // EXTEND a String test var myStr = wf.declareStr("my-str"); wf.doIf(myStr.isNotEqualTo(null), then -> { - myStr.assignTo(myStr.extend("-suffix")); + myStr.assign(myStr.extend("-suffix")); }); // Add an int and composite expressions @@ -255,14 +255,14 @@ public Workflow getExpression() { var intToAddResult = wf.declareInt("int-to-add-result"); wf.doIf(intToAdd.isNotEqualTo(null), then -> { // Tests compound expressions - intToAddResult.assignTo(wf.execute("expr-add-one", intToAdd.add(1))); + intToAddResult.assign(wf.execute("expr-add-one", intToAdd.add(1))); }); // Division By Zero test var thingToDivideByZero = wf.declareInt("thing-to-divide-by-zero"); var divideByZeroResult = wf.declareInt("divide-by-zero-result"); wf.doIf(thingToDivideByZero.isNotEqualTo(null), then -> { - divideByZeroResult.assignTo(thingToDivideByZero.divide(0)); + divideByZeroResult.assign(thingToDivideByZero.divide(0)); }); // Test precision of arithmetic. Make use of the fact that we don't have @@ -273,8 +273,8 @@ public Workflow getExpression() { var divisionResultInt = wf.declareInt("division-result-int"); wf.doIf(divisionTestJson.isNotEqualTo(null), then -> { LHExpression foobar = divisionTestJson.jsonPath("$.lhs").divide(divisionTestJson.jsonPath("$.rhs")); - divisionResult.assignTo(foobar); - divisionResultInt.assignTo(foobar); + divisionResult.assign(foobar); + divisionResultInt.assign(foobar); }); // This test uses a complex expression where the things we are computing over @@ -289,20 +289,20 @@ public Workflow getExpression() { // TotalPrice = Quantity * Price * (1 - DiscountPercentage / 100) LHExpression pedro = quantity.multiply(price).multiply(wf.subtract(1.0, discountPercentage.divide(100.0))); - totalPriceInt.assignTo(pedro); - totalPriceDouble.assignTo(pedro); + totalPriceInt.assign(pedro); + totalPriceDouble.assign(pedro); }); // Test mutating sub-fields of a json object var json = wf.declareJsonObj("json"); wf.doIf(json.isNotEqualTo(null), then -> { - json.jsonPath("$.foo").assignTo("bar"); + json.jsonPath("$.foo").assign("bar"); }); // Test mutating doubly-nested fields of a Json Object var nestedJson = wf.declareJsonObj("nested-json"); wf.doIf(nestedJson.isNotEqualTo(null), then -> { - nestedJson.jsonPath("$.foo.bar").assignTo("baz"); + nestedJson.jsonPath("$.foo.bar").assign("baz"); }); }); }