Skip to content

Commit

Permalink
Python sdk: add reminder tasks for user task node output
Browse files Browse the repository at this point in the history
  • Loading branch information
eduwercamacaro committed Feb 19, 2024
1 parent 8f576d3 commit 30c74c4
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 17 deletions.
52 changes: 36 additions & 16 deletions sdk-python/littlehorse/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,31 @@ def _find_next_node(self, name: str) -> WorkflowNode:
return self._nodes[i]
raise ReferenceError("Next node not found")

def _schedule_reminder_task_helper(
self,
user_task: UserTaskOutput,
delay_in_seconds: VariableAssignment,
task_def_name: str,
hook: UTActionTrigger.UTHook,
*args: Any,
) -> None:
task_node = TaskNode(
task_def_id=TaskDefId(name=task_def_name),
variables=[to_variable_assignment(arg) for arg in args],
)
trigger: UTActionTrigger = UTActionTrigger(
task=UTActionTrigger.UTATask(task=task_node),
delay_seconds=delay_in_seconds,
hook=hook,
)
cur_node = self._last_node()

if cur_node.name != user_task.node_name:
raise ValueError("Tried to reassign stale User Task!")

ut_node: UserTaskNode = typing.cast(UserTaskNode, cur_node.sub_node)
ut_node.actions.append(trigger)

def execute(self, task_name: str, *args: Any) -> NodeOutput:
"""Adds a TASK node to the ThreadSpec.
Expand Down Expand Up @@ -1123,27 +1148,22 @@ def release_to_group_on_deadline(
def schedule_reminder_task(
self,
user_task: UserTaskOutput,
delay_in_seconds: int,
delay_in_seconds: Union[int, WfRunVariable],
task_def_name: str,
*args: Any,
) -> None:
delay_in_seconds_var = to_variable_assignment(delay_in_seconds)
task_node = TaskNode(
task_def_id=TaskDefId(name=task_def_name),
variables=[to_variable_assignment(arg) for arg in args],
)
trigger: UTActionTrigger = UTActionTrigger(
task=UTActionTrigger.UTATask(task=task_node),
delay_seconds=delay_in_seconds_var,
hook=UTActionTrigger.ON_ARRIVAL,
)
cur_node = self._last_node()
self._schedule_reminder_task_helper(user_task, delay_in_seconds_var, task_def_name, UTActionTrigger.ON_ARRIVAL, args)

if cur_node.name != user_task.node_name:
raise ValueError("Tried to reassign stale User Task!")

ut_node: UserTaskNode = typing.cast(UserTaskNode, cur_node.sub_node)
ut_node.actions.append(trigger)
def schedule_reminder_task_on_assignment(
self,
user_task: UserTaskOutput,
delay_in_seconds: Union[int, WfRunVariable],
task_def_name: str,
*args: Any,
) -> None:
delay_in_seconds_var = to_variable_assignment(delay_in_seconds)
self._schedule_reminder_task_helper(user_task, delay_in_seconds_var, task_def_name, UTActionTrigger.ON_TASK_ASSIGNED, args)

def wait_for_event(self, event_name: str, timeout: int = -1) -> NodeOutput:
"""Adds an EXTERNAL_EVENT node which blocks until an
Expand Down
27 changes: 26 additions & 1 deletion sdk-python/tests/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
VariableDef,
VariableMutation,
VariableMutationType,
UTActionTrigger,
)
from littlehorse.model.service_pb2 import PutWfSpecRequest
from littlehorse.model.variable_pb2 import VariableValue
Expand Down Expand Up @@ -1780,10 +1781,34 @@ def wf_func(thread: WorkflowThread) -> None:
self.assertEqual(len(ut_node.actions), 1)

action = ut_node.actions[0]
self.assertEqual(action.hook, UTActionTrigger.ON_ARRIVAL)
self.assertEqual(action.delay_seconds.literal_value.int, 60)
self.assertTrue(action.HasField("task"))
reminder_task = action.task
self.assertEqual(reminder_task.task.task_def_id.name, "my-reminder-task")

def test_reminder_task_on_assignment(self):
def wf_func(thread: WorkflowThread) -> None:
uto = thread.assign_user_task(
"my-user-task",
user_id="asdf",
user_group="my-group",
)
thread.schedule_reminder_task_on_assignment(uto, 60, "my-reminder-task", "my-arg")

wf = Workflow("my-wf", wf_func).compile()
thread = wf.thread_specs[wf.entrypoint_thread_name]

node = thread.nodes["1-my-user-task-USER_TASK"]
ut_node = node.user_task

self.assertEqual(len(ut_node.actions), 1)

action = ut_node.actions[0]
self.assertEqual(action.hook, UTActionTrigger.ON_TASK_ASSIGNED)
self.assertEqual(action.delay_seconds.literal_value.int, 60)
self.assertTrue(action.HasField("task"))
reminder_task = action.task
print(reminder_task)
self.assertEqual(reminder_task.task.task_def_id.name, "my-reminder-task")

def test_reassign_to_user_str(self):
Expand Down

0 comments on commit 30c74c4

Please sign in to comment.