Skip to content

Commit

Permalink
Python sdk: add reminder tasks for user task node output
Browse files Browse the repository at this point in the history
  • Loading branch information
eduwercamacaro committed Feb 19, 2024
1 parent 16f41af commit 8f576d3
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 0 deletions.
25 changes: 25 additions & 0 deletions sdk-python/littlehorse/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,31 @@ def release_to_group_on_deadline(
)
ut_node.actions.append(trigger)

def schedule_reminder_task(
self,
user_task: UserTaskOutput,
delay_in_seconds: int,
task_def_name: str,
*args: Any,
) -> None:
delay_in_seconds_var = to_variable_assignment(delay_in_seconds)
task_node = TaskNode(
task_def_id=TaskDefId(name=task_def_name),
variables=[to_variable_assignment(arg) for arg in args],
)
trigger: UTActionTrigger = UTActionTrigger(
task=UTActionTrigger.UTATask(task=task_node),
delay_seconds=delay_in_seconds_var,
hook=UTActionTrigger.ON_ARRIVAL,
)
cur_node = self._last_node()

if cur_node.name != user_task.node_name:
raise ValueError("Tried to reassign stale User Task!")

ut_node: UserTaskNode = typing.cast(UserTaskNode, cur_node.sub_node)
ut_node.actions.append(trigger)

def wait_for_event(self, event_name: str, timeout: int = -1) -> NodeOutput:
"""Adds an EXTERNAL_EVENT node which blocks until an
'ExternalEvent' of the specified type arrives.
Expand Down
24 changes: 24 additions & 0 deletions sdk-python/tests/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1762,6 +1762,30 @@ def wf_func(thread: WorkflowThread) -> None:
reassign = action.reassign
self.assertEqual(reassign.user_group.literal_value.str, "my-group")

def test_reminder_task(self):
def wf_func(thread: WorkflowThread) -> None:
uto = thread.assign_user_task(
"my-user-task",
user_id="asdf",
user_group="my-group",
)
thread.schedule_reminder_task(uto, 60, "my-reminder-task", "my-arg")

wf = Workflow("my-wf", wf_func).compile()
thread = wf.thread_specs[wf.entrypoint_thread_name]

node = thread.nodes["1-my-user-task-USER_TASK"]
ut_node = node.user_task

self.assertEqual(len(ut_node.actions), 1)

action = ut_node.actions[0]
self.assertEqual(action.delay_seconds.literal_value.int, 60)
self.assertTrue(action.HasField("task"))
reminder_task = action.task
print(reminder_task)
self.assertEqual(reminder_task.task.task_def_id.name, "my-reminder-task")

def test_reassign_to_user_str(self):
def wf_func(thread: WorkflowThread) -> None:
uto = thread.assign_user_task(
Expand Down

0 comments on commit 8f576d3

Please sign in to comment.