Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk-python): Port in-line expressions to Python SDK #1166

Merged
merged 26 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9587da0
Catch null default type and refactor error messages
Snarr Nov 26, 2024
3c5fe82
Port LHExpression SDK changes to Python
Snarr Nov 26, 2024
b1eebae
Update tests for new SDK changes
Snarr Nov 27, 2024
1e5d0af
Update 'remove_index' argument to use Optional Union instead of tryin…
Snarr Nov 27, 2024
3f567a0
Spotless apply on SDK Java
Snarr Nov 27, 2024
ae54914
Fix typing "Any" and add return type annotations
Snarr Nov 27, 2024
42caf83
Improve WfRunVariable constructor comments
Snarr Nov 27, 2024
1d8fd25
Add ParentThread to WfRunVariables in tests
Snarr Nov 27, 2024
2c1e250
Merge branch 'master' into feat/expressions-python-sdk
Snarr Nov 27, 2024
af79268
Use 1 underscore for private method
Snarr Nov 27, 2024
338da9d
Merge branch 'feat/expressions-python-sdk' of https://github.com/litt…
Snarr Nov 27, 2024
78f817b
Check for instance of LHExpression
Snarr Nov 27, 2024
cdf2d4c
Add back parent to WfRunVariable
Snarr Nov 27, 2024
08e1e49
Add parent attribute to class
Snarr Nov 27, 2024
8e3cfbc
Restore example basic
Snarr Nov 27, 2024
4d524b7
Remove pass
Snarr Nov 27, 2024
0e07334
Use Proto Expression instead of LHExpression
Snarr Nov 27, 2024
39c45f8
Add self to methods
Snarr Nov 27, 2024
1821425
Add WorkflowThread where applicable
Snarr Nov 27, 2024
d494bba
Merge branch 'master' into feat/expressions-python-sdk
HazimAr Nov 28, 2024
17356af
Port tests to Python
Snarr Nov 28, 2024
301e4be
Merge branch 'feat/expressions-python-sdk' of https://github.com/litt…
Snarr Nov 28, 2024
5313a55
Merge branch 'master' into feat/expressions-python-sdk
Snarr Nov 28, 2024
ca93fbd
Merge branch 'master' into feat/expressions-python-sdk
Snarr Dec 2, 2024
a011a8d
Rename "assign_to" method to "assign" in Java and Python
Snarr Dec 2, 2024
11954d4
Merge branch 'master' into feat/expressions-python-sdk
Snarr Dec 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,14 @@ class WfRunVariableImpl implements WfRunVariable {

public WfRunVariableImpl(String name, Object typeOrDefaultVal, WorkflowThreadImpl parent) {
this.name = name;
this.typeOrDefaultVal = typeOrDefaultVal;
this.parent = parent;

if (typeOrDefaultVal == null) {
throw new IllegalArgumentException(
"The 'typeOrDefaultVal' argument must be either a VariableType or a default value, but a null value was provided.");
}
this.typeOrDefaultVal = typeOrDefaultVal;

// As per GH Issue #582, the default is now PRIVATE_VAR.
this.accessLevel = WfRunVariableAccessLevel.PRIVATE_VAR;
initializeType();
Expand All @@ -49,13 +54,8 @@ private void initializeType() {
if (typeOrDefaultVal instanceof VariableType) {
this.type = (VariableType) typeOrDefaultVal;
} else {
try {
this.defaultValue = LHLibUtil.objToVarVal(typeOrDefaultVal);
this.type = LHLibUtil.fromValueCase(defaultValue.getValueCase());
} catch (LHSerdeError e) {
throw new IllegalArgumentException(
"Was unable to convert provided default value to LH Variable Type", e);
}
setDefaultValue(typeOrDefaultVal);
this.type = LHLibUtil.fromValueCase(defaultValue.getValueCase());
}
}

Expand Down Expand Up @@ -98,15 +98,21 @@ public WfRunVariable required() {

@Override
public WfRunVariable withDefault(Object defaultVal) {
setDefaultValue(defaultVal);

if (!LHLibUtil.fromValueCase(defaultValue.getValueCase()).equals(type)) {
throw new IllegalArgumentException("Default value type does not match LH variable type " + type);
}

return this;
}

private void setDefaultValue(Object defaultVal) {
try {
VariableValue attempt = LHLibUtil.objToVarVal(defaultVal);
if (!LHLibUtil.fromValueCase(attempt.getValueCase()).equals(type)) {
throw new IllegalArgumentException("Default value type does not match variable type");
}
this.defaultValue = LHLibUtil.objToVarVal(defaultVal);
} catch (LHSerdeError e) {
throw new IllegalArgumentException("Was unable to convert provided default value to LH Variable Type", e);
}
return this;
}

@Override
Expand Down
216 changes: 188 additions & 28 deletions sdk-python/littlehorse/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,40 @@ def to_variable_assignment(value: Any) -> VariableAssignment:
)


class LHExpression:
def __init__(self, lhs: Any, operation: VariableMutationType, rhs: Any) -> None:
self._lhs = lhs
self._operation = operation
self._rhs = rhs
pass
Snarr marked this conversation as resolved.
Show resolved Hide resolved

def add(self, other: Any) -> LHExpression:
return LHExpression(self, VariableMutationType.ADD, other)

def subtract(self, other: Any) -> LHExpression:
return LHExpression(self, VariableMutationType.SUBTRACT, other)

def multiply(self, other: Any) -> LHExpression:
return LHExpression(self, VariableMutationType.MULTIPLY, other)

def divide(self, other: Any) -> LHExpression:
return LHExpression(self, VariableMutationType.DIVIDE, other)

def extend(self, other: Any) -> LHExpression:
return LHExpression(self, VariableMutationType.EXTEND, other)

def remove_if_present(self, other: Any) -> LHExpression:
return LHExpression(self, VariableMutationType.REMOVE_IF_PRESENT, other)

def remove_index(self, index: Optional[Union[int, Any]] = None) -> LHExpression:
if index is None:
raise ValueError("Expected 'index' to be set, but it was None.")
return LHExpression(self, VariableMutationType.REMOVE_INDEX, index)

def remove_key(self, other: Any) -> LHExpression:
return LHExpression(self, VariableMutationType.REMOVE_KEY, other)


class WorkflowCondition:
def __init__(self, left_hand: Any, comparator: Comparator, right_hand: Any) -> None:
"""Returns a WorkflowCondition that can be used in
Expand Down Expand Up @@ -246,7 +280,7 @@ def __init__(self, format: str, *args: Any) -> None:
self._args = args


class NodeOutput:
class NodeOutput(LHExpression):
def __init__(self, node_name: str) -> None:
self.node_name = node_name
self._json_path: Optional[str] = None
Expand Down Expand Up @@ -284,25 +318,53 @@ def with_json_path(self, json_path: str) -> "NodeOutput":
out = NodeOutput(self.node_name)
out.json_path = json_path
return out

def add(self, other: Any) -> LHExpression:
return LHExpression(self, VariableMutationType.ADD, other)

def subtract(self, other: Any) -> LHExpression:
return LHExpression(self, VariableMutationType.SUBTRACT, other)

def multiply(self, other: Any) -> LHExpression:
return LHExpression(self, VariableMutationType.MULTIPLY, other)

def divide(self, other: Any) -> LHExpression:
return LHExpression(self, VariableMutationType.DIVIDE, other)

def extend(self, other: Any) -> LHExpression:
return LHExpression(self, VariableMutationType.EXTEND, other)

def remove_if_present(self, other: Any) -> LHExpression:
return LHExpression(self, VariableMutationType.REMOVE_IF_PRESENT, other)

def remove_index(self, index: Optional[Union[int, Any]] = None) -> LHExpression:
if index is None:
raise ValueError("Expected 'index' to be set, but it was None.")
return LHExpression(self, VariableMutationType.REMOVE_INDEX, index)

def remove_key(self, other: Any) -> LHExpression:
return LHExpression(self, VariableMutationType.REMOVE_KEY, other)


class WfRunVariable:
def __init__(
self,
variable_name: str,
variable_type: VariableType,
parent: WorkflowThread,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the default value is None use = None here

Optional[WorkflowThread]

Copy link
Member

@sauljabin sauljabin Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not totally convince if we should do this.

Actually we can do something like:

def is_equal_to(self, rhs: Any) -> WorkflowCondition:
        return WorkflowCondition(self, Comparator.EQUALS, rhs)

Even though we can implement it similar to java, I would prefer to reduce the dependencies in python.

Circular dependencies errors are very common in python. They (python devs) are going to get rid of it, and make it a more java like language, but i do not know when.

Copy link
Member Author

@Snarr Snarr Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sauljabin I agree with your proposal, it is unnecessary to depend on the parent condition methods when creating a new WorkflowCondition is so simple. If the code for creating a new WorkflowCondition were more complex, it may make sense to centralize it in one place and rely on this dependency injection or another class to hold the methods.

There is one additional method that may not make sense to port without injecting the parent thread dependency:

    @Override
    public void assignTo(Serializable rhs) {
        parent.mutate(this, VariableMutationType.ASSIGN, rhs);
    }

Without injecting the parent WorkflowThread at the constructor, this method may not make sense or be easy to use. Instead, we would have to pass it to the method as an argument, which would not save the user much code and (in my opinion) be less readable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I did not see the mutate

default_value: Any = None,
access_level: Optional[
Union[WfRunVariableAccessLevel, str]
] = WfRunVariableAccessLevel.PUBLIC_VAR,
] = WfRunVariableAccessLevel.PRIVATE_VAR,
Snarr marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
"""Defines a Variable in the ThreadSpec and returns a handle to it.

Args:
variable_name (str): The name of the variable.
variable_type (VariableType): The variable type.
access_level (WfRunVariableAccessLevel): Sets the access level of a WfRunVariable.
parent (WorkflowThread): The parent WorkflowThread of this variable
default_value (Any, optional): A default value. Defaults to None.
access_level (WfRunVariableAccessLevel): Sets the access level of a WfRunVariable. Defaults to PRIVATE_VAR.

Returns:
WfRunVariable: A handle to the created WfRunVariable.
Expand All @@ -312,6 +374,7 @@ def __init__(
"""
self.name = variable_name
self.type = variable_type
self.parent = parent
self.default_value: Optional[VariableValue] = None
self._json_path: Optional[str] = None
self._required = False
Expand All @@ -321,14 +384,7 @@ def __init__(
self._access_level = access_level

if default_value is not None:
self.default_value = to_variable_value(default_value)
if (
self.default_value.WhichOneof("value")
!= str(VariableType.Name(self.type)).lower()
):
raise TypeError(
f"Default value is not a {VariableType.Name(variable_type)}"
)
self.__set_default(default_value)

@property
def json_path(self) -> Optional[str]:
Expand Down Expand Up @@ -368,7 +424,7 @@ def with_json_path(self, json_path: str) -> "WfRunVariable":
f"JsonPath not allowed in a {VariableType.Name(self.type)} variable"
)

out = WfRunVariable(self.name, self.type, self.default_value)
out = WfRunVariable(self.name, self.type, self.parent, self.default_value)
out.json_path = json_path
return out

Expand Down Expand Up @@ -445,6 +501,21 @@ def searchable_on(
def required(self) -> "WfRunVariable":
self._required = True
return self

def with_default(self, default_value: Any) -> WfRunVariable:
self.__set_default(default_value)

return self

def __set_default(self, default_value: Any) -> None:
Copy link
Member

@sauljabin sauljabin Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 underscore for internal 2 for language related ones.

example __init__ 2 underscore

self.default_value = to_variable_value(default_value)
if (
self.default_value.WhichOneof("value")
!= str(VariableType.Name(self.type)).lower()
):
raise TypeError(
f"Default value type does not match LH variable type {VariableType.Name(self.type)}"
)

def masked(self) -> "WfRunVariable":
self._masked = True
Expand All @@ -469,6 +540,65 @@ def compile(self) -> ThreadVarDef:
access_level=self._access_level,
)

def is_equal_to(self, rhs: Any) -> WorkflowCondition:
return self.parent.condition(self, Comparator.EQUALS, rhs)

def is_not_equal_to(self, rhs: Any) -> WorkflowCondition:
return self.parent.condition(self, Comparator.NOT_EQUALS, rhs)

def is_greater_than(self, rhs: Any) -> WorkflowCondition:
return self.parent.condition(self, Comparator.GREATER_THAN, rhs)

def is_greater_than_eq(self, rhs: Any) -> WorkflowCondition:
return self.parent.condition(self, Comparator.GREATER_THAN_EQ, rhs)

def is_less_than_eq(self, rhs: Any) -> WorkflowCondition:
return self.parent.condition(self, Comparator.LESS_THAN_EQ, rhs)

def is_less_than(self, rhs: Any) -> WorkflowCondition:
return self.parent.condition(self, Comparator.LESS_THAN, rhs)

def does_contain(self, rhs: Any) -> WorkflowCondition:
return self.parent.condition(self, Comparator.IN, rhs)

def does_not_contain(self, rhs: Any) -> WorkflowCondition:
return self.parent.condition(self, Comparator.NOT_IN, rhs)

def is_in(self, rhs: Any) -> WorkflowCondition:
return self.parent.condition(self, Comparator.IN, rhs)

def is_not_in(self, rhs: Any) -> WorkflowCondition:
return self.parent.condition(self, Comparator.NOT_IN, rhs)

def assign_to(self, rhs: Any) -> None:
self.parent.mutate(self, VariableMutationType.ASSIGN, rhs)

def add(self, other: Any) -> LHExpression:
return LHExpression(self, VariableMutationType.ADD, other)

def subtract(self, other: Any) -> LHExpression:
return LHExpression(self, VariableMutationType.SUBTRACT, other)

def multiply(self, other: Any) -> LHExpression:
return LHExpression(self, VariableMutationType.MULTIPLY, other)

def divide(self, other: Any) -> LHExpression:
return LHExpression(self, VariableMutationType.DIVIDE, other)

def extend(self, other: Any) -> LHExpression:
return LHExpression(self, VariableMutationType.EXTEND, other)

def remove_if_present(self, other: Any) -> LHExpression:
return LHExpression(self, VariableMutationType.REMOVE_IF_PRESENT, other)

def remove_index(self, index: Optional[Union[int, Any]] = None) -> LHExpression:
if index is None:
raise ValueError("Expected 'index' to be set, but it was None.")
return LHExpression(self, VariableMutationType.REMOVE_INDEX, index)

def remove_key(self, key: Any) -> LHExpression:
return LHExpression(self, VariableMutationType.REMOVE_KEY, key)

def __str__(self) -> str:
return to_json(self.compile())

Expand Down Expand Up @@ -1129,6 +1259,50 @@ def execute(
node_name = self.add_node(readable_name, task_node)
return NodeOutput(node_name)

def multiply(lhs: Any, rhs: Any) -> LHExpression:
return LHExpression(lhs, VariableMutationType.MULTIPLY, rhs)

def add(lhs: Any, rhs: Any) -> LHExpression:
return LHExpression(lhs, VariableMutationType.ADD, rhs)

def divide(lhs: Any, rhs: Any) -> LHExpression:
return LHExpression(lhs, VariableMutationType.DIVIDE, rhs)

def subtract(lhs: Any, rhs: Any) -> LHExpression:
return LHExpression(lhs, VariableMutationType.SUBTRACT, rhs)

def extend(lhs: Any, rhs: Any) -> LHExpression:
return LHExpression(lhs, VariableMutationType.EXTEND, rhs)

def remove_if_present(lhs: Any, rhs: Any) -> LHExpression:
return LHExpression(lhs, VariableMutationType.REMOVE_IF_PRESENT, rhs)

def remove_index(self, index: Optional[Union[int, Any]] = None) -> LHExpression:
if index is None:
raise ValueError("Expected 'index' to be set, but it was None.")
return LHExpression(self, VariableMutationType.REMOVE_INDEX, index)

def remove_key(lhs: Any, rhs: Any) -> LHExpression:
return LHExpression(lhs, VariableMutationType.REMOVE_KEY, rhs)

def declare_bool(self, name: str) -> WfRunVariable:
return self.add_variable(name, VariableType.BOOL)

def declare_int(self, name: str) -> WfRunVariable:
return self.add_variable(name, VariableType.INT)

def declare_double(self, name: str) -> WfRunVariable:
return self.add_variable(name, VariableType.DOUBLE)

def declare_bytes(self, name: str) -> WfRunVariable:
return self.add_variable(name, VariableType.BYTES)

def declare_json_arr(self, name: str) -> WfRunVariable:
return self.add_variable(name, VariableType.JSON_ARR)

def declare_json_obj(self, name: str) -> WfRunVariable:
return self.add_variable(name, VariableType.JSON_OBJ)

def handle_any_failure(
self, node: NodeOutput, initializer: "ThreadInitializer"
) -> None:
Expand Down Expand Up @@ -1458,22 +1632,9 @@ def mutate(
use the output of a Node Run to mutate variables).
"""
self._check_if_active()
last_node = self._last_node()

node_output: Optional[VariableMutation.NodeOutputSource] = None
rhs_assignment: Optional[VariableAssignment] = None
literal_value: Optional[VariableValue] = None

if isinstance(right_hand, NodeOutput):
if last_node.name != right_hand.node_name:
raise ReferenceError("NodeOutput does not match with last node")
node_output = VariableMutation.NodeOutputSource(
jsonpath=right_hand.json_path
)
elif isinstance(right_hand, WfRunVariable):
rhs_assignment = to_variable_assignment(right_hand)
else:
literal_value = to_variable_value(right_hand)
rhs_assignment = to_variable_assignment(right_hand)

mutation = VariableMutation(
lhs_name=left_hand.name,
Expand Down Expand Up @@ -1524,7 +1685,7 @@ def add_variable(
raise ValueError(f"Variable {variable_name} already added")

new_var = WfRunVariable(
variable_name, variable_type, default_value, access_level
variable_name, variable_type, self, default_value, access_level
)
self._wf_run_variables.append(new_var)
return new_var
Expand Down Expand Up @@ -1934,7 +2095,6 @@ def with_task_timeout_seconds(
self._default_timeout_seconds = timeout_seconds
return self


def create_workflow_spec(
workflow: Workflow, config: LHConfig, timeout: Optional[int] = None
) -> None:
Expand Down
2 changes: 1 addition & 1 deletion sdk-python/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def test_parse_assignment_variable(self):

# a WfRunVariable
wf_run_variable = WfRunVariable(
variable_name="my-var-name", variable_type=VariableType.STR
variable_name="my-var-name", variable_type=VariableType.STR, parent=None
)
wf_run_variable.json_path = "$.myPath"
variable = to_variable_assignment(wf_run_variable)
Expand Down
Loading
Loading