Skip to content

Commit 5ca0c87

Browse files
famartingdependabot[bot]elena-kolevskahhunter-msemsearcy
committed
workflows: update durabletask dependency (dapr#757)
* Bump codecov/codecov-action from 4 to 5 (dapr#753) Bumps [codecov/codecov-action](https://github.com/codecov/codecov-action) from 4 to 5. - [Release notes](https://github.com/codecov/codecov-action/releases) - [Changelog](https://github.com/codecov/codecov-action/blob/main/CHANGELOG.md) - [Commits](codecov/codecov-action@v4...v5) --- updated-dependencies: - dependency-name: codecov/codecov-action dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Signed-off-by: Elena Kolevska <[email protected]> * update durabletask to use fork Signed-off-by: Fabian Martinez <[email protected]> Signed-off-by: Elena Kolevska <[email protected]> * add purge workflow function Signed-off-by: Fabian Martinez <[email protected]> Signed-off-by: Elena Kolevska <[email protected]> * support reuse id policy Signed-off-by: Fabian Martinez <[email protected]> Signed-off-by: Elena Kolevska <[email protected]> * support set custom status Signed-off-by: Fabian Martinez <[email protected]> Signed-off-by: Elena Kolevska <[email protected]> * Update ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py Co-authored-by: Elena Kolevska <[email protected]> Signed-off-by: Fabian Martinez <[email protected]> Signed-off-by: Elena Kolevska <[email protected]> * Update ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py Co-authored-by: Elena Kolevska <[email protected]> Signed-off-by: Fabian Martinez <[email protected]> Signed-off-by: Elena Kolevska <[email protected]> * Update ext/dapr-ext-workflow/tests/test_workflow_client.py Co-authored-by: Elena Kolevska <[email protected]> Signed-off-by: Fabian Martinez <[email protected]> Signed-off-by: Elena Kolevska <[email protected]> * Update ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py Co-authored-by: Elena Kolevska <[email protected]> Signed-off-by: Fabian Martinez <[email protected]> Signed-off-by: Elena Kolevska <[email protected]> * update test, grpc version and lint Signed-off-by: Fabian Martinez <[email protected]> Signed-off-by: Elena Kolevska <[email protected]> * Adds missing arguments in FakeTaskHubGrpcClient Signed-off-by: Elena Kolevska <[email protected]> * linter Signed-off-by: Elena Kolevska <[email protected]> * remove alpha for workflow stable release (dapr#760) Signed-off-by: Hannah Hunter <[email protected]> Signed-off-by: Elena Kolevska <[email protected]> * Replace deprecated tox.ini option (dapr#762) This option was replaced in 2020, deprecated, and eventually removed in tox 4. The correct option already appears elseware in this tox.ini file. This fix is necessary to run `tox -e doc` per the README.md instructions on tox 4. Signed-off-by: Eric Searcy <[email protected]> Co-authored-by: Elena Kolevska <[email protected]> Signed-off-by: Elena Kolevska <[email protected]> * Add Actor Mocks (dapr#750) * Moved files to new branch to avoid weird git bug Signed-off-by: Lorenzo Curcio <[email protected]> * requested documentation changes Signed-off-by: Lorenzo Curcio <[email protected]> * forgot to move file back to starting point Signed-off-by: Lorenzo Curcio <[email protected]> * result of ruff format Signed-off-by: Lorenzo Curcio <[email protected]> * fixed minor formatting issues, fixed type issues Signed-off-by: Lorenzo Curcio <[email protected]> * minor test fix Signed-off-by: Lorenzo Curcio <[email protected]> * fixes try_add_state Signed-off-by: Elena Kolevska <[email protected]> Signed-off-by: Lorenzo Curcio <[email protected]> * Revert "fixes try_add_state" This reverts commit 254ad17. Signed-off-by: Lorenzo Curcio <[email protected]> * Update dapr/actor/runtime/mock_state_manager.py Fixing bug in try_add_state as mentioned in PR dapr#756 Co-authored-by: Elena Kolevska <[email protected]> Signed-off-by: Lorenzo Curcio <[email protected]> Signed-off-by: Lorenzo Curcio <[email protected]> * Update dapr/actor/runtime/mock_actor.py Whoops missed this Co-authored-by: Elena Kolevska <[email protected]> Signed-off-by: Lorenzo Curcio <[email protected]> * Update daprdocs/content/en/python-sdk-docs/python-actor.md Co-authored-by: Elena Kolevska <[email protected]> Signed-off-by: Lorenzo Curcio <[email protected]> * Update daprdocs/content/en/python-sdk-docs/python-actor.md Co-authored-by: Elena Kolevska <[email protected]> Signed-off-by: Lorenzo Curcio <[email protected]> * Update daprdocs/content/en/python-sdk-docs/python-actor.md Co-authored-by: Elena Kolevska <[email protected]> Signed-off-by: Lorenzo Curcio <[email protected]> * Update daprdocs/content/en/python-sdk-docs/python-actor.md Co-authored-by: Elena Kolevska <[email protected]> Signed-off-by: Lorenzo Curcio <[email protected]> * Update daprdocs/content/en/python-sdk-docs/python-actor.md Co-authored-by: Elena Kolevska <[email protected]> Signed-off-by: Lorenzo Curcio <[email protected]> * Update daprdocs/content/en/python-sdk-docs/python-actor.md Co-authored-by: Elena Kolevska <[email protected]> Signed-off-by: Lorenzo Curcio <[email protected]> * Update daprdocs/content/en/python-sdk-docs/python-actor.md Co-authored-by: Elena Kolevska <[email protected]> Signed-off-by: Lorenzo Curcio <[email protected]> * minor error in docs Signed-off-by: Lorenzo Curcio <[email protected]> * fixed and added more unit tests. Added example Signed-off-by: Lorenzo Curcio <[email protected]> * unittest fix Signed-off-by: Lorenzo Curcio <[email protected]> * Update examples/demo_actor/README.md Co-authored-by: Elena Kolevska <[email protected]> Signed-off-by: Lorenzo Curcio <[email protected]> * concentrated some tests Signed-off-by: Lorenzo Curcio <[email protected]> * removed unnecessary type hint Signed-off-by: Lorenzo Curcio <[email protected]> * Update daprdocs/content/en/python-sdk-docs/python-actor.md didnt see this earlier whoops Co-authored-by: Elena Kolevska <[email protected]> Signed-off-by: Lorenzo Curcio <[email protected]> * Update examples/demo_actor/README.md Co-authored-by: Elena Kolevska <[email protected]> Signed-off-by: Lorenzo Curcio <[email protected]> * documentation changes Signed-off-by: Lorenzo Curcio <[email protected]> * now requires #type: ignore Signed-off-by: Lorenzo Curcio <[email protected]> * small docs change Signed-off-by: Elena Kolevska <[email protected]> * examples test fix Signed-off-by: Elena Kolevska <[email protected]> --------- Signed-off-by: Lorenzo Curcio <[email protected]> Signed-off-by: Lorenzo Curcio <[email protected]> Signed-off-by: Elena Kolevska <[email protected]> Co-authored-by: Elena Kolevska <[email protected]> Co-authored-by: Lorenzo Curcio <[email protected]> Co-authored-by: Elena Kolevska <[email protected]> Signed-off-by: Elena Kolevska <[email protected]> * Fixes try_add_state in actor state manger (dapr#756) Signed-off-by: Elena Kolevska <[email protected]> * Integration test for http invocation (dapr#758) Signed-off-by: Elena Kolevska <[email protected]> * fixes missing state store in test (dapr#759) Signed-off-by: Elena Kolevska <[email protected]> * Mark workflows API functions as deprecated (dapr#749) * workflows, remove deprecated functions Signed-off-by: Fabian Martinez <[email protected]> * revert changes to example Signed-off-by: Fabian Martinez <[email protected]> * update warning messages Signed-off-by: Fabian Martinez <[email protected]> * Typos Signed-off-by: Elena Kolevska <[email protected]> * fixes linter Signed-off-by: Elena Kolevska <[email protected]> * Apply suggestions from code review Signed-off-by: Elena Kolevska <[email protected]> * Apply suggestions from code review Signed-off-by: Elena Kolevska <[email protected]> --------- Signed-off-by: Fabian Martinez <[email protected]> Signed-off-by: Elena Kolevska <[email protected]> Signed-off-by: Elena Kolevska <[email protected]> Co-authored-by: Elena Kolevska <[email protected]> Co-authored-by: Elena Kolevska <[email protected]> Signed-off-by: Elena Kolevska <[email protected]> * Removes support for 3.8 and adds 3.13 to test version matrix (dapr#763) Signed-off-by: Elena Kolevska <[email protected]> * Updates dapr email to dapr.io (dapr#764) Signed-off-by: Elena Kolevska <[email protected]> * Reverts grpc bump Signed-off-by: Elena Kolevska <[email protected]> * Updates protos and fixes grpc-tools for protos generation (dapr#766) * Updates protos and fixes grpc-tools for protos generation Signed-off-by: Elena Kolevska <[email protected]> * bumps grpcio tools version Signed-off-by: Elena Kolevska <[email protected]> --------- Signed-off-by: Elena Kolevska <[email protected]> * Bump dapr/durabletask version Signed-off-by: Elena Kolevska <[email protected]> --------- Signed-off-by: Elena Kolevska <[email protected]> Signed-off-by: Fabian Martinez <[email protected]> Signed-off-by: Hannah Hunter <[email protected]> Signed-off-by: Eric Searcy <[email protected]> Signed-off-by: Lorenzo Curcio <[email protected]> Signed-off-by: Lorenzo Curcio <[email protected]> Signed-off-by: Elena Kolevska <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Elena Kolevska <[email protected]> Co-authored-by: Elena Kolevska <[email protected]> Co-authored-by: Hannah Hunter <[email protected]> Co-authored-by: Eric Searcy <[email protected]> Co-authored-by: Lorenzo Curcio <[email protected]> Co-authored-by: Lorenzo Curcio <[email protected]> Signed-off-by: Elena Kolevska <[email protected]>
1 parent 5d01ddc commit 5ca0c87

10 files changed

+77
-19
lines changed

dev-requirements.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@ Flask>=1.1
1515
# needed for auto fix
1616
ruff===0.2.2
1717
# needed for dapr-ext-workflow
18-
durabletask>=0.1.1a1
18+
durabletask-dapr >= 0.2.0a4

examples/workflow/monitor.py

+1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ def send_alert(ctx, message: str):
6969
except Exception:
7070
pass
7171
if not status or status.runtime_status.name != 'RUNNING':
72+
# TODO update to use reuse_id_policy
7273
instance_id = wf_client.schedule_new_workflow(
7374
workflow=status_monitor_workflow,
7475
input=JobStatus(job_id=job_id, is_healthy=True),

examples/workflow/task_chaining.py

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
2727
except Exception as e:
2828
yield ctx.call_activity(error_handler, input=str(e))
2929
raise
30+
# TODO update to set custom status
3031
return [result1, result2, result3]
3132

3233

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py

+25-3
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
from datetime import datetime
1818
from typing import Any, Optional, TypeVar
1919

20+
2021
from durabletask import client
22+
import durabletask.internal.orchestrator_service_pb2 as pb
2123

2224
from dapr.ext.workflow.workflow_state import WorkflowState
2325
from dapr.ext.workflow.workflow_context import Workflow
@@ -78,6 +80,7 @@ def schedule_new_workflow(
7880
input: Optional[TInput] = None,
7981
instance_id: Optional[str] = None,
8082
start_at: Optional[datetime] = None,
83+
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None,
8184
) -> str:
8285
"""Schedules a new workflow instance for execution.
8386
@@ -90,6 +93,8 @@ def schedule_new_workflow(
9093
start_at: The time when the workflow instance should start executing.
9194
If not specified or if a date-time in the past is specified, the workflow instance will
9295
be scheduled immediately.
96+
reuse_id_policy: Optional policy to reuse the workflow id when there is a conflict with
97+
an existing workflow instance.
9398
9499
Returns:
95100
The ID of the scheduled workflow instance.
@@ -100,9 +105,14 @@ def schedule_new_workflow(
100105
input=input,
101106
instance_id=instance_id,
102107
start_at=start_at,
108+
reuse_id_policy=reuse_id_policy,
103109
)
104110
return self.__obj.schedule_new_orchestration(
105-
workflow.__name__, input=input, instance_id=instance_id, start_at=start_at
111+
workflow.__name__,
112+
input=input,
113+
instance_id=instance_id,
114+
start_at=start_at,
115+
reuse_id_policy=reuse_id_policy,
106116
)
107117

108118
def get_workflow_state(
@@ -208,7 +218,9 @@ def raise_workflow_event(
208218
"""
209219
return self.__obj.raise_orchestration_event(instance_id, event_name, data=data)
210220

211-
def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None):
221+
def terminate_workflow(
222+
self, instance_id: str, *, output: Optional[Any] = None, recursive: bool = True
223+
):
212224
"""Terminates a running workflow instance and updates its runtime status to
213225
WorkflowRuntimeStatus.Terminated This method internally enqueues a "terminate" message in
214226
the task hub. When the task hub worker processes this message, it will update the runtime
@@ -226,9 +238,10 @@ def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None):
226238
Args:
227239
instance_id: The ID of the workflow instance to terminate.
228240
output: The optional output to set for the terminated workflow instance.
241+
recursive: The optional flag to terminate all child workflows.
229242
230243
"""
231-
return self.__obj.terminate_orchestration(instance_id, output=output)
244+
return self.__obj.terminate_orchestration(instance_id, output=output, recursive=recursive)
232245

233246
def pause_workflow(self, instance_id: str):
234247
"""Suspends a workflow instance, halting processing of it until resume_workflow is used to
@@ -246,3 +259,12 @@ def resume_workflow(self, instance_id: str):
246259
instance_id: The instance ID of the workflow to resume.
247260
"""
248261
return self.__obj.resume_orchestration(instance_id)
262+
263+
def purge_workflow(self, instance_id: str, recursive: bool = True):
264+
"""Purge data from a workflow instance.
265+
266+
Args:
267+
instance_id: The instance ID of the workflow to purge.
268+
recursive: The optional flag to also purge data from all child workflows.
269+
"""
270+
return self.__obj.purge_orchestration(instance_id, recursive)

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py

+4
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ def current_utc_datetime(self) -> datetime:
5353
def is_replaying(self) -> bool:
5454
return self.__obj.is_replaying
5555

56+
def set_custom_status(self, custom_status: str) -> None:
57+
self._logger.debug(f'{self.instance_id}: Setting custom status to {custom_status}')
58+
self.__obj.set_custom_status(custom_status)
59+
5660
def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
5761
self._logger.debug(f'{self.instance_id}: Creating timer to fire at {fire_at} time')
5862
return self.__obj.create_timer(fire_at)

ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py

+5
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ def is_replaying(self) -> bool:
8484
"""
8585
pass
8686

87+
@abstractmethod
88+
def set_custom_status(self, custom_status: str) -> None:
89+
"""Set the custom status."""
90+
pass
91+
8792
@abstractmethod
8893
def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
8994
"""Create a Timer Task to fire after at the specified deadline.

ext/dapr-ext-workflow/setup.cfg

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ packages = find_namespace:
2525
include_package_data = True
2626
install_requires =
2727
dapr-dev >= 1.13.0rc1.dev
28-
durabletask >= 0.1.1a1
28+
durabletask-dapr >= 0.2.0a4
2929

3030
[options.packages.find]
3131
include =

ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py

+8
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@
2525
mock_create_timer = 'create_timer'
2626
mock_call_activity = 'call_activity'
2727
mock_call_sub_orchestrator = 'call_sub_orchestrator'
28+
mock_custom_status = 'custom_status'
2829

2930

3031
class FakeOrchestrationContext:
3132
def __init__(self):
3233
self.instance_id = mock_instance_id
34+
self.custom_status = None
3335

3436
def create_timer(self, fire_at):
3537
return mock_create_timer
@@ -40,6 +42,9 @@ def call_activity(self, activity, input):
4042
def call_sub_orchestrator(self, orchestrator, input, instance_id):
4143
return mock_call_sub_orchestrator
4244

45+
def set_custom_status(self, custom_status):
46+
self.custom_status = custom_status
47+
4348

4449
class DaprWorkflowContextTest(unittest.TestCase):
4550
def mock_client_activity(ctx: WorkflowActivityContext, input):
@@ -65,3 +70,6 @@ def test_workflow_context_functions(self):
6570

6671
create_timer_result = dapr_wf_ctx.create_timer(mock_date_time)
6772
assert create_timer_result == mock_create_timer
73+
74+
dapr_wf_ctx.set_custom_status(mock_custom_status)
75+
assert fakeContext.custom_status == mock_custom_status

ext/dapr-ext-workflow/tests/test_workflow_client.py

+30-13
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,26 @@
2020
from unittest import mock
2121
from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient
2222
from durabletask import client
23+
import durabletask.internal.orchestrator_service_pb2 as pb
2324

2425
mock_schedule_result = 'workflow001'
2526
mock_raise_event_result = 'event001'
2627
mock_terminate_result = 'terminate001'
2728
mock_suspend_result = 'suspend001'
2829
mock_resume_result = 'resume001'
29-
mockInstanceId = 'instance001'
30+
mock_purge_result = 'purge001'
31+
mock_instance_id = 'instance001'
3032

3133

3234
class FakeTaskHubGrpcClient:
33-
def schedule_new_orchestration(self, workflow, input, instance_id, start_at):
35+
def schedule_new_orchestration(
36+
self,
37+
workflow,
38+
input,
39+
instance_id,
40+
start_at,
41+
reuse_id_policy: Union[pb.OrchestrationIdReusePolicy, None] = None,
42+
):
3443
return mock_schedule_result
3544

3645
def get_orchestration_state(self, instance_id, fetch_payloads):
@@ -49,7 +58,9 @@ def raise_orchestration_event(
4958
):
5059
return mock_raise_event_result
5160

52-
def terminate_orchestration(self, instance_id: str, *, output: Union[Any, None] = None):
61+
def terminate_orchestration(
62+
self, instance_id: str, *, output: Union[Any, None] = None, recursive: bool = True
63+
):
5364
return mock_terminate_result
5465

5566
def suspend_orchestration(self, instance_id: str):
@@ -58,6 +69,9 @@ def suspend_orchestration(self, instance_id: str):
5869
def resume_orchestration(self, instance_id: str):
5970
return mock_resume_result
6071

72+
def purge_orchestration(self, instance_id: str, recursive: bool = True):
73+
return mock_purge_result
74+
6175
def _inner_get_orchestration_state(self, instance_id, state: client.OrchestrationStatus):
6276
return client.OrchestrationState(
6377
instance_id=instance_id,
@@ -87,35 +101,38 @@ def test_client_functions(self):
87101
assert actual_schedule_result == mock_schedule_result
88102

89103
actual_get_result = wfClient.get_workflow_state(
90-
instance_id=mockInstanceId, fetch_payloads=True
104+
instance_id=mock_instance_id, fetch_payloads=True
91105
)
92106
assert actual_get_result.runtime_status.name == 'PENDING'
93-
assert actual_get_result.instance_id == mockInstanceId
107+
assert actual_get_result.instance_id == mock_instance_id
94108

95109
actual_wait_start_result = wfClient.wait_for_workflow_start(
96-
instance_id=mockInstanceId, timeout_in_seconds=30
110+
instance_id=mock_instance_id, timeout_in_seconds=30
97111
)
98112
assert actual_wait_start_result.runtime_status.name == 'RUNNING'
99-
assert actual_wait_start_result.instance_id == mockInstanceId
113+
assert actual_wait_start_result.instance_id == mock_instance_id
100114

101115
actual_wait_completion_result = wfClient.wait_for_workflow_completion(
102-
instance_id=mockInstanceId, timeout_in_seconds=30
116+
instance_id=mock_instance_id, timeout_in_seconds=30
103117
)
104118
assert actual_wait_completion_result.runtime_status.name == 'COMPLETED'
105-
assert actual_wait_completion_result.instance_id == mockInstanceId
119+
assert actual_wait_completion_result.instance_id == mock_instance_id
106120

107121
actual_raise_event_result = wfClient.raise_workflow_event(
108-
instance_id=mockInstanceId, event_name='test_event', data='test_data'
122+
instance_id=mock_instance_id, event_name='test_event', data='test_data'
109123
)
110124
assert actual_raise_event_result == mock_raise_event_result
111125

112126
actual_terminate_result = wfClient.terminate_workflow(
113-
instance_id=mockInstanceId, output='test_output'
127+
instance_id=mock_instance_id, output='test_output'
114128
)
115129
assert actual_terminate_result == mock_terminate_result
116130

117-
actual_suspend_result = wfClient.pause_workflow(instance_id=mockInstanceId)
131+
actual_suspend_result = wfClient.pause_workflow(instance_id=mock_instance_id)
118132
assert actual_suspend_result == mock_suspend_result
119133

120-
actual_resume_result = wfClient.resume_workflow(instance_id=mockInstanceId)
134+
actual_resume_result = wfClient.resume_workflow(instance_id=mock_instance_id)
121135
assert actual_resume_result == mock_resume_result
136+
137+
actual_purge_result = wfClient.purge_workflow(instance_id=mock_instance_id)
138+
assert actual_purge_result == mock_purge_result

setup.cfg

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[metadata]
22
url = https://dapr.io/
33
author = Dapr Authors
4-
author_email = [email protected]
4+
author_email = [email protected]
55
license = Apache
66
license_file = LICENSE
77
classifiers =

0 commit comments

Comments
 (0)