Skip to content

Commit

Permalink
Merge branch 'master' into feat/workflow-event-noderunid
Browse files Browse the repository at this point in the history
  • Loading branch information
Snarr authored Dec 10, 2024
2 parents 74067f8 + e3bbe4a commit 6a3b690
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,19 @@ func QuickstartWorkflow(wf *littlehorse.WorkflowThread) {
</TabItem>
<TabItem value="python" label="Python">

Python docs for user tasks coming soon.

```python
def get_workflow() -> Workflow:
def my_entrypoint(wf: WorkflowThread) -> None:
task_def_name = "greet"
user_task_output = wf.assign_user_task("person-details", None, "writer-group")
delay_in_seconds = 10 // wait 10 seconds after the task is assigned to schedule the reminder
arg1 = "Sam"
arg2 = {"identification": "1258796641-4", "Address": "NA-Street", "Age": 28}
wf.schedule_reminder_task(user_task_output, delay_in_seconds, task_def_name, arg1, arg2)

return Workflow("example-user-tasks", my_entrypoint)
```
</TabItem>
</Tabs>

Expand Down
36 changes: 36 additions & 0 deletions sdk-python/examples/user_tasks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
## Running a User Tasks Example

This is a simple example of user tasks usage scheduling a reminder task before the user task is completed:

Let's run the example:

```
poetry shell
python ./examples/user_tasks/user_tasks.py
```

In another terminal, use `lhctl` to run the workflow:

```
lhctl run example-user-tasks
```

When you run the example-user-tasks workflow, the taskworker should call the `greet` task method and print
the following message:

```
Hello Sam!. WfRun {{wfRunId}} Person: {'identification': '1258796641-4', 'Address': 'NA-Street', 'Age': 28}
```

In addition, you can check the result with:

```
# This call shows the result
lhctl get wfRun <wf_run_id>
# This will show you all nodes in tha run
lhctl list nodeRun <wf_run_id>
# This shows the task run information
lhctl get taskRun <wf_run_id> <task_run_global_id>
```
69 changes: 69 additions & 0 deletions sdk-python/examples/user_tasks/user_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import asyncio
import logging
from pathlib import Path
import random

import littlehorse
from littlehorse.config import LHConfig
from littlehorse.model import VariableType, PutUserTaskDefRequest, UserTaskField
from littlehorse.worker import LHTaskWorker, WorkerContext
from littlehorse.workflow import WorkflowThread, Workflow
from typing import Any

logging.basicConfig(level=logging.INFO)


def get_config() -> LHConfig:
config = LHConfig()
config_path = Path.home().joinpath(".config", "littlehorse.config")
if config_path.exists():
config.load(config_path)
return config

async def get_user_task_def() -> PutUserTaskDefRequest:
return PutUserTaskDefRequest(
name="person-details",
fields=[
UserTaskField(
name="PersonDetails",
description="Person complementary information",
display_name="Other Details",
required=True,
type=VariableType.STR
)
]
)

def get_workflow() -> Workflow:
def my_entrypoint(wf: WorkflowThread) -> None:
task_def_name = "greet"
user_task_output = wf.assign_user_task("person-details", None, "writer-group")
delay_in_seconds = 10
arg1 = "Sam"
arg2 = {"identification": "1258796641-4", "Address": "NA-Street", "Age": 28}

wf.schedule_reminder_task(user_task_output, delay_in_seconds, task_def_name, arg1, arg2)

return Workflow("example-user-tasks", my_entrypoint)

async def greeting(name: str, person_details: dict[str, Any], ctx: WorkerContext) -> str:
msg = f"Hello {name}!. WfRun {ctx.wf_run_id.id} Person: {person_details}"
print(msg)
await asyncio.sleep(random.uniform(0.5, 1.5))
return msg

async def main() -> None:
config = get_config()
wf = get_workflow()
client = config.stub()

user_task_def = await get_user_task_def()
client.PutUserTaskDef(user_task_def)
littlehorse.create_task_def(greeting, "greet", config)
littlehorse.create_workflow_spec(wf, config)

await littlehorse.start(LHTaskWorker(greeting, "greet", config))


if __name__ == "__main__":
asyncio.run(main())
2 changes: 1 addition & 1 deletion sdk-python/littlehorse/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1575,7 +1575,7 @@ def schedule_reminder_task(
delay_in_seconds_var,
task_def_name,
UTActionTrigger.ON_ARRIVAL,
args,
*args,
)

def schedule_reminder_task_on_assignment(
Expand Down

0 comments on commit 6a3b690

Please sign in to comment.