diff --git a/hello/hello_update.py b/hello/hello_update.py index bf439b15..111d95b1 100644 --- a/hello/hello_update.py +++ b/hello/hello_update.py @@ -1,6 +1,6 @@ import asyncio -from temporalio import common, workflow +from temporalio import workflow from temporalio.client import Client from temporalio.worker import Worker @@ -37,7 +37,6 @@ async def main(): GreetingWorkflow.run, id="hello-update-workflow-id", task_queue="update-workflow-task-queue", - id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, ) # Perform the update for GreetingWorkflow diff --git a/tests/update/serialized_handling_of_n_messages.py b/tests/update/serialized_handling_of_n_messages.py deleted file mode 100644 index 5c78af37..00000000 --- a/tests/update/serialized_handling_of_n_messages.py +++ /dev/null @@ -1,95 +0,0 @@ -import asyncio -import logging -import uuid -from dataclasses import dataclass -from unittest.mock import patch - -import temporalio.api.common.v1 -import temporalio.api.enums.v1 -import temporalio.api.update.v1 -import temporalio.api.workflowservice.v1 -from temporalio.client import Client, WorkflowHandle -from temporalio.worker import Worker -from temporalio.workflow import UpdateMethodMultiParam - -from update.serialized_handling_of_n_messages import ( - MessageProcessor, - Result, - get_current_time, -) - - -async def test_continue_as_new_doesnt_lose_updates(client: Client): - with patch( - "temporalio.workflow.Info.is_continue_as_new_suggested", return_value=True - ): - tq = str(uuid.uuid4()) - wf = await client.start_workflow( - MessageProcessor.run, id=str(uuid.uuid4()), task_queue=tq - ) - update_requests = [ - UpdateRequest(wf, MessageProcessor.process_message, i) for i in range(10) - ] - for req in update_requests: - await req.wait_until_admitted() - - async with Worker( - client, - task_queue=tq, - workflows=[MessageProcessor], - activities=[get_current_time], - ): - for req in update_requests: - update_result = await req.task - assert update_result.startswith(req.expected_result_prefix()) - - -@dataclass -class UpdateRequest: - wf_handle: WorkflowHandle - update: UpdateMethodMultiParam - sequence_number: int - - def __post_init__(self): - self.task = asyncio.Task[Result]( - self.wf_handle.execute_update(self.update, args=[self.arg], id=self.id) - ) - - async def wait_until_admitted(self): - while True: - try: - return await self._poll_update_non_blocking() - except Exception as err: - logging.warning(err) - - async def _poll_update_non_blocking(self): - req = temporalio.api.workflowservice.v1.PollWorkflowExecutionUpdateRequest( - namespace=self.wf_handle._client.namespace, - update_ref=temporalio.api.update.v1.UpdateRef( - workflow_execution=temporalio.api.common.v1.WorkflowExecution( - workflow_id=self.wf_handle.id, - run_id="", - ), - update_id=self.id, - ), - identity=self.wf_handle._client.identity, - ) - res = await self.wf_handle._client.workflow_service.poll_workflow_execution_update( - req - ) - # TODO: @cretz how do we work with these raw proto objects? - assert "stage: UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED" in str(res) - - @property - def arg(self) -> str: - return str(self.sequence_number) - - @property - def id(self) -> str: - return str(self.sequence_number) - - def expected_result_prefix(self) -> str: - # TODO: Currently the server does not send updates to the worker in order of admission When - # this is fixed (https://github.com/temporalio/temporal/pull/5831), we can make a stronger - # assertion about the activity numbers used to construct each result. - return f"{self.arg}-result" diff --git a/update/avoid_races_between_handler_and_main_coroutine.py b/update/avoid_races_between_handler_and_main_coroutine.py deleted file mode 100644 index 97e7f8f3..00000000 --- a/update/avoid_races_between_handler_and_main_coroutine.py +++ /dev/null @@ -1,158 +0,0 @@ -import asyncio -import logging -from datetime import timedelta - -from temporalio import activity, common, workflow -from temporalio.client import Client, WorkflowHandle -from temporalio.worker import Worker - -# Problem: You have the buggy workflow below. You need to fix it so that the workflow state is no -# longer garbled due to interleaving of the handler with the main workflow. -# -# Solution 1: Use a sync handler and process the message in the main workflow coroutine. -# -# Solution 2: Await a custom “handler complete” condition. - - -class WorkflowBase: - def __init__(self) -> None: - self.letters = [] - - async def do_multiple_async_tasks_that_mutate_workflow_state(self, text: str): - for i in range(len(text)): - letter = await workflow.execute_activity( - get_letter, - args=[text, i], - start_to_close_timeout=timedelta(seconds=10), - ) - self.letters.append(letter) - - -@workflow.defn -class AvoidHandlerAndWorkflowInterleavingIncorrect(WorkflowBase): - """ - This workflow implementation is incorrect: the handler execution interleaves with the main - workflow coroutine. - """ - - def __init__(self) -> None: - super().__init__() - self.handler_started = False - self.handler_finished = False - - @workflow.run - async def run(self) -> str: - await workflow.wait_condition(lambda: self.handler_started) - await self.do_multiple_async_tasks_that_mutate_workflow_state( - "world!" - ) # Bug: handler and main wf are now interleaving - - await workflow.wait_condition(lambda: self.handler_finished) - return "".join(self.letters) - - @workflow.update - async def update_that_does_multiple_async_tasks_that_mutate_workflow_state( - self, text: str - ): - self.handler_started = True - await self.do_multiple_async_tasks_that_mutate_workflow_state(text) - self.handler_finished = True - - -@workflow.defn -class AvoidHandlerAndWorkflowInterleavingCorrect1(WorkflowBase): - """ - Solution 1: sync handler enqueues work; splice work into the main wf coroutine so that it cannot - interleave with work of main wf coroutine. - """ - - def __init__(self) -> None: - super().__init__() - self.handler_text = asyncio.Future[str]() - self.handler_finished = False - - @workflow.run - async def run(self) -> str: - handler_input = await self.handler_text - await self.do_multiple_async_tasks_that_mutate_workflow_state(handler_input) - await self.do_multiple_async_tasks_that_mutate_workflow_state("world!") - await workflow.wait_condition(lambda: self.handler_finished) - return "".join(self.letters) - - # Note: sync handler - @workflow.update - def update_that_does_multiple_async_tasks_that_mutate_workflow_state( - self, text: str - ): - self.handler_text.set_result(text) - self.handler_finished = True - - -@workflow.defn -class AvoidHandlerAndWorkflowInterleavingCorrect2(WorkflowBase): - """ - Solution 2: async handler notifies when complete; main wf coroutine waits for this to avoid - interleaving its own work. - """ - - def __init__(self) -> None: - super().__init__() - self.handler_finished = False - - @workflow.run - async def run(self) -> str: - await workflow.wait_condition(lambda: self.handler_finished) - await self.do_multiple_async_tasks_that_mutate_workflow_state("world!") - return "".join(self.letters) - - @workflow.update - async def update_that_does_multiple_async_tasks_that_mutate_workflow_state( - self, text: str - ): - await self.do_multiple_async_tasks_that_mutate_workflow_state(text) - self.handler_finished = True - - -@activity.defn -async def get_letter(text: str, i: int) -> str: - return text[i] - - -async def app(wf: WorkflowHandle): - await wf.execute_update( - AvoidHandlerAndWorkflowInterleavingCorrect1.update_that_does_multiple_async_tasks_that_mutate_workflow_state, - args=["Hello "], - ) - print(await wf.result()) - - -async def main(): - client = await Client.connect("localhost:7233") - - async with Worker( - client, - task_queue="tq", - workflows=[ - AvoidHandlerAndWorkflowInterleavingIncorrect, - AvoidHandlerAndWorkflowInterleavingCorrect1, - AvoidHandlerAndWorkflowInterleavingCorrect2, - ], - activities=[get_letter], - ): - for wf in [ - AvoidHandlerAndWorkflowInterleavingIncorrect, - AvoidHandlerAndWorkflowInterleavingCorrect1, - AvoidHandlerAndWorkflowInterleavingCorrect2, - ]: - handle = await client.start_workflow( - wf.run, - id="wid", - task_queue="tq", - id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, - ) - await app(handle) - - -if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) - asyncio.run(main()) diff --git a/update/job_runner_notes.py b/update/job_runner_notes.py deleted file mode 100644 index e3b7999a..00000000 --- a/update/job_runner_notes.py +++ /dev/null @@ -1,409 +0,0 @@ -import asyncio -from asyncio import Future -from collections import deque -from datetime import datetime, timedelta -import logging -from sys import version -from typing import Any, Iterator, Optional, TypedDict, Union - -from attr import dataclass -from temporalio import common, workflow, activity -from temporalio.client import Client, WorkflowHandle -from temporalio.worker import Worker - - -############################################################################### -## SDK internals -## -@dataclass -class Message: - type: str # maps to a handler if a matching handler exists - args: tuple[Any] # deserialized arg payloads - # Can expose other update/signal metadata - received_at: datetime - client_identity: str - - async def handle(self): - await workflow.call_handler(self.type, *self.args) - - -@dataclass -class Signal(Message): - pass - - -@dataclass -class Update(Message): - id: str # the update ID - - -# -# Raw incoming workflow events -# -# The incoming event stream contains newly delivered updates and signals. Perhaps -# ContinueAsNewSuggested could also be an event. -# -# We could have "UpdateReceived" as the incoming event, with UpdateAccepted/UpdateRejected emitted -# later. However, An alternative is that the SDK executes the update validators immediately, before -# the user has a chance to interact with the event stream. We'll adopt that version for now, since -# it involves fewer event types. -@dataclass -class SignalReceived: - signal: Signal - - -@dataclass -class UpdateRejected: - update: Update - - -@dataclass -class UpdateAccepted: - update: Update - - -@dataclass -class ContinueAsNewSuggested: - pass - - -IncomingWorkflowEvent = Union[ - UpdateAccepted, UpdateRejected, SignalReceived, ContinueAsNewSuggested -] - - -def workflow_incoming_event_stream() -> Iterator[IncomingWorkflowEvent]: ... - - -setattr(workflow, "incoming_event_stream", workflow_incoming_event_stream) -# -# Other events that are emitted automatically by a running workflow -# - - -@dataclass -class UpdateCompleted: - pass - - -class SignalHandlerReturned: - # This is tangential to work on updates. Introducing this event would introduce a new concept of - # "signal-processing finished", which could be used to help users wait for signal processing to - # finish before CAN / workflow return. The idea is that the event would be emitted when a signal - # handler returns. - pass - - -# -# Events that users can add to the event stream -# - - -class TimerFired: - pass - - -class CustomFutureResolved: - pass - - -EventStream = Iterator[ - Union[ - SignalReceived, - UpdateRejected, - UpdateAccepted, - ContinueAsNewSuggested, - UpdateCompleted, - SignalHandlerReturned, - TimerFired, - CustomFutureResolved, - ] -] - - -class Selector: - def get_event_stream(self) -> EventStream: ... - - -# By default, a workflow behaves as if it is doing the following: -def handle_incoming_events(): - for ev in workflow.incoming_event_stream(): - match ev: - case SignalReceived(signal): - asyncio.create_task(signal.handle()) - case UpdateAccepted(update): - asyncio.create_task(update.handle()) - - -HandlerType = str -UpdateID = str - - -# This class is just a toy prototype: this functionality will be implemented on WorkflowInstance in -# the SDK. -class WorkflowInternals: - def __init__(self): - self.incoming_event_stream = deque[IncomingWorkflowEvent]() - self.ready_to_execute_handler: dict[UpdateID, Future[None]] = {} - - def accept_update(self, update: Update): - # This will be done by the SDK prior to invoking handler, around - # https://github.com/temporalio/sdk-python/blob/11a97d1ab2ebfe8c973bf396b1e14077ec611e52/temporalio/worker/_workflow_instance.py#L506 - self.incoming_event_stream.append(UpdateAccepted(update)) - - # By default, handlers are ready to execute immediately after the update is accepted. - self.ready_to_execute_handler[update.id] = resolved_future(None) - - async def _wait_until_ready_to_execute(self, update_id: UpdateID): - await self.ready_to_execute_handler[update_id] - - -def resolved_future[X](result: X) -> Future[X]: - fut = Future() - fut.set_result(result) - return fut - - -workflow_internals = WorkflowInternals() - -############################################################################### -## -## User's code -## - -# A user may want to handle the event stream differently. - -# workflow API design must make the following two things convenient for users: -# - DrainBeforeWorkflowCompletion -# - NoInterleaving, optionally with CustomOrder - - -def make_event_stream() -> EventStream: - selector = Selector() - return selector.get_event_stream() - - -event_stream = make_event_stream() - -# -JobId = int -ClusterSlot = str - - -class Job(TypedDict): - update_id: UpdateID - depends_on: list[UpdateID] - after_time: Optional[int] - name: str - run: str - python_interpreter_version: Optional[str] - - -class JobOutput(TypedDict): - status: int - stdout: str - stderr: str - - -@workflow.defn -class JobRunner: - - def __init__(self): - self.jobs: dict[JobId, Job] = {} - - # Design notes - # ------------ - # Updates always have handler functions, and an update handler function is still just a normal - # handler function: it implements the handling logic and the return value. - # - # Every workflow will have an underlying event stream. By default, this yields the following - # events: - # - # - UpdateRejected - # - UpdateAccepted (UpdateEnqueued) - # - UpdateDequeued - # - UpdateCompleted - # - # The SDK will provide a default implementation of the event stream, looking something like this: - - # - # The handler will be invoked automatically by the SDK when the underlying event stream yields - # an UpdateDequeued event for this update ID. The user does not have to know anything about - # this: by default, handlers are executed before other workflow code, in order of update - # arrival. - - # The SDK is capable of automatically draining the event stream before workflow return / CAN, - # including an option for this draining to result in serial execution of the handlers (i.e. - # waiting for all async work scheduled by the handler to complete before the next handler is - # invoked, and not allowing the workflow to complete until all such work is complete.) Default - # behavior TBD. - # - # The event stream thus provides a way for users to wait until a specific message handler has - # completed, or until all message handlers have completed. These can be exposed via convenient - # `wait_for_X()` APIs, rather than interacting with the raw event stream - # - # Furthermore, users can optionally implement the EventStream themselves. This gives them - # precise control over the ordering of handler invocation with respect to all other workflow - # events (e.g. other update completions, and custom futures such as timers and - # workflow.wait_condition). - # - # TODO: does handler invocation remain automatic on yielding Dequeue, or is that too magical. An - # alternative would be for users to be able to call update.handle() on an update object obtained - # from an event object yielded by the event stream. - - @workflow.update - async def run_shell_script_job(self, job: Job) -> JobOutput: - """ - To be executed in order dictated by job dependency graph (see `jobs.depends_on`) and not - before `job.after_time`. - """ - ## SDK internals: please pretend this is implemented in the SDK - await workflow_internals._wait_until_ready_to_execute(job["update_id"]) - ## - - if security_errors := await workflow.execute_activity( - run_shell_script_security_linter, - args=[job["run"]], - start_to_close_timeout=timedelta(seconds=10), - ): - return JobOutput(status=1, stdout="", stderr=security_errors) - job_output = await workflow.execute_activity( - run_job, args=[job], start_to_close_timeout=timedelta(seconds=10) - ) # SDK emits UpdateCompleted - return job_output - - @workflow.update - async def run_python_job(self, job: Job) -> JobOutput: - """ - To be executed in order dictated by job dependency graph (see `jobs.depends_on`) and not - before `job.after_time`. - """ - ## SDK internals: please pretend this is implemented in the SDK - await workflow_internals._wait_until_ready_to_execute(job["update_id"]) - ## - - if not await workflow.execute_activity( - check_python_interpreter_version, - args=[job["python_interpreter_version"]], - start_to_close_timeout=timedelta(seconds=10), - ): - return JobOutput( - status=1, - stdout="", - stderr=f"Python interpreter version {version} is not available", - ) - job_output = await workflow.execute_activity( - run_job, args=[job], start_to_close_timeout=timedelta(seconds=10) - ) # SDK emits UpdateCompleted - return job_output - - @run_shell_script_job.validator - def validate_shell_script_job(self, job: Job): - ## SDK internals: please pretend this is implemented in the SDK - workflow_internals.accept_update( - Update( - type="run_shell_script_job", - args=(job,), - client_identity="some-client-id", - id=job["update_id"], - received_at=workflow.now(), - ) - ) - ## - - @run_python_job.validator - def validate_python_job(self, job: Job): - ## SDK internals: please pretend this is implemented in the SDK - workflow_internals.accept_update( - Update( - type="run_python_job", - args=(job,), - client_identity="some-client-id", - id=job["update_id"], - received_at=workflow.now(), - ) - ) - ## - - @workflow.run - async def run(self): - while not workflow.info().is_continue_as_new_suggested(): - await workflow.wait_condition(lambda: len(self.jobs) > 0) - workflow.continue_as_new() - - -@activity.defn -async def run_job(job: Job) -> JobOutput: - await asyncio.sleep(0.1) - stdout = f"Ran job {job["name"]} at {datetime.now()}" - print(stdout) - return JobOutput(status=0, stdout=stdout, stderr="") - - -@activity.defn -async def request_cluster_slot(job: Job) -> ClusterSlot: - await asyncio.sleep(0.1) - return "cluster-slot-token-abc123" - - -@activity.defn -async def run_shell_script_security_linter(code: str) -> str: - # The user's organization requires that all shell scripts pass an in-house linter that checks - # for shell scripting constructions deemed insecure. - await asyncio.sleep(0.1) - return "" - - -@activity.defn -async def check_python_interpreter_version(version: str) -> bool: - await asyncio.sleep(0.1) - version_is_available = True - return version_is_available - - -async def app(wf: WorkflowHandle): - job_1 = Job( - update_id="1", - depends_on=[], - after_time=None, - name="should-run-first", - run="echo 'Hello world 1!'", - python_interpreter_version=None, - ) - job_2 = Job( - update_id="2", - depends_on=["1"], - after_time=None, - name="should-run-second", - run="print('Hello world 2!')", - python_interpreter_version=None, - ) - job_2 = await wf.execute_update(JobRunner.run_python_job, job_2) - job_1 = await wf.execute_update(JobRunner.run_shell_script_job, job_1) - - -async def main(): - client = await Client.connect("localhost:7233") - async with Worker( - client, - task_queue="tq", - workflows=[JobRunner], - activities=[ - run_job, - run_shell_script_security_linter, - check_python_interpreter_version, - request_cluster_slot, - ], - ): - wf = await client.start_workflow( - JobRunner.run, - id="wid", - task_queue="tq", - id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, - ) - await app(wf) - - -if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) - asyncio.run(main()) diff --git a/update/order_handling_of_n_messages.py b/update/order_handling_of_n_messages.py deleted file mode 100644 index 3169b74e..00000000 --- a/update/order_handling_of_n_messages.py +++ /dev/null @@ -1,116 +0,0 @@ -import asyncio -import logging -import random -from typing import Optional - -from temporalio import common, workflow -from temporalio.client import Client, WorkflowHandle -from temporalio.worker import Worker - -Payload = str -SerializedQueueState = tuple[int, list[tuple[int, Payload]]] - - -class OrderedQueue: - def __init__(self): - self._futures = {} - self.head = 0 - self.lock = asyncio.Lock() - - def add(self, item: Payload, position: int): - fut = self._futures.setdefault(position, asyncio.Future()) - if not fut.done(): - fut.set_result(item) - else: - workflow.logger.warn(f"duplicate message for position {position}") - - async def next(self) -> Payload: - async with self.lock: - payload = await self._futures.setdefault(self.head, asyncio.Future()) - # Note: user must delete the payload to avoid unbounded memory usage - del self._futures[self.head] - self.head += 1 - return payload - - def serialize(self) -> SerializedQueueState: - payloads = [(i, fut.result()) for i, fut in self._futures.items() if fut.done()] - return (self.head, payloads) - - # This is bad, but AFAICS it's the best we can do currently until we have a workflow init - # functionality in all SDKs (https://github.com/temporalio/features/issues/400). Currently the - # problem is: if a signal/update handler is sync, then it cannot wait for anything in the main - # wf coroutine. After CAN, a message may arrive in the first WFT, but the sync handler cannot - # wait for wf state to be initialized. So we are forced to update an *existing* queue with the - # carried-over state. - def update_from_serialized_state(self, serialized_state: SerializedQueueState): - head, payloads = serialized_state - self.head = head - for i, p in payloads: - if i in self._futures: - workflow.logger.error(f"duplicate message {i} encountered when deserializing state carried over CAN") - else: - self._futures[i] = resolved_future(p) - - -def resolved_future[X](x: X) -> asyncio.Future[X]: - fut = asyncio.Future[X]() - fut.set_result(x) - return fut - - - -@workflow.defn -class MessageProcessor: - def __init__(self) -> None: - self.queue = OrderedQueue() - - @workflow.run - async def run(self, serialized_queue_state: Optional[SerializedQueueState] = None): - # Initialize workflow state after CAN. Note that handler is sync, so it cannot wait for - # workflow initialization. - if serialized_queue_state: - self.queue.update_from_serialized_state(serialized_queue_state) - while True: - workflow.logger.info(f"waiting for msg {self.queue.head + 1}") - payload = await self.queue.next() - workflow.logger.info(payload) - if workflow.info().is_continue_as_new_suggested(): - workflow.logger.info("CAN") - workflow.continue_as_new(args=[self.queue.serialize()]) - - # Note: sync handler - @workflow.update - def process_message(self, sequence_number: int, payload: Payload): - self.queue.add(payload, sequence_number) - - -async def app(wf: WorkflowHandle): - sequence_numbers = list(range(100)) - random.shuffle(sequence_numbers) - for i in sequence_numbers: - print(f"sending update {i}") - await wf.execute_update( - MessageProcessor.process_message, args=[i, f"payload {i}"] - ) - - -async def main(): - client = await Client.connect("localhost:7233") - - async with Worker( - client, - task_queue="tq", - workflows=[MessageProcessor], - ): - wf = await client.start_workflow( - MessageProcessor.run, - id="wid", - task_queue="tq", - id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, - ) - await asyncio.gather(app(wf), wf.result()) - - -if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) - asyncio.run(main()) diff --git a/update/order_handling_of_two_messages.py b/update/order_handling_of_two_messages.py deleted file mode 100644 index 0aa0234a..00000000 --- a/update/order_handling_of_two_messages.py +++ /dev/null @@ -1,79 +0,0 @@ -import asyncio -import logging -from dataclasses import dataclass -from typing import Optional - -from temporalio import common, workflow -from temporalio.client import Client, WorkflowHandle -from temporalio.worker import Worker - - -# Shows how to make a pair of update or signal handlers run in a certain order even -# if they are received out of order. -@workflow.defn -class WashAndDryCycle: - - @dataclass - class WashResults: - num_items: int - - @dataclass - class DryResults: - num_items: int - moisture_level: int - - def __init__(self) -> None: - self._wash_results: Optional[WashAndDryCycle.WashResults] = None - self._dry_results: Optional[WashAndDryCycle.DryResults] = None - - @workflow.run - async def run(self): - await workflow.wait_condition(lambda: self._dry_results is not None) - assert self._dry_results - workflow.logger.info( - f"Finished washing and drying {self._dry_results.num_items} items, moisture level: {self._dry_results.moisture_level}" - ) - - @workflow.update - async def wash(self, num_items) -> WashResults: - self._wash_results = WashAndDryCycle.WashResults(num_items=num_items) - return self._wash_results - - @workflow.update - async def dry(self) -> DryResults: - await workflow.wait_condition(lambda: self._wash_results is not None) - assert self._wash_results - self._dry_results = WashAndDryCycle.DryResults( - num_items=self._wash_results.num_items, moisture_level=3 - ) - return self._dry_results - - -async def app(wf: WorkflowHandle): - # In normal operation, wash comes before dry, but here we simulate out-of-order receipt of messages - await asyncio.gather( - wf.execute_update(WashAndDryCycle.dry), - wf.execute_update(WashAndDryCycle.wash, 10), - ) - - -async def main(): - client = await Client.connect("localhost:7233") - - async with Worker( - client, - task_queue="tq", - workflows=[WashAndDryCycle], - ): - handle = await client.start_workflow( - WashAndDryCycle.run, - id="wid", - task_queue="tq", - id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, - ) - await app(handle) - - -if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) - asyncio.run(main()) diff --git a/update/order_handling_relative_to_main_workflow.py b/update/order_handling_relative_to_main_workflow.py deleted file mode 100644 index 44baa300..00000000 --- a/update/order_handling_relative_to_main_workflow.py +++ /dev/null @@ -1,83 +0,0 @@ -import asyncio -import logging -from datetime import timedelta - -from temporalio import activity, common, workflow -from temporalio.client import Client, WorkflowHandle -from temporalio.worker import Worker - - -@workflow.defn -class WashAndDryCycle: - - def __init__(self) -> None: - self.has_detergent = False - self.wash_complete = False - self.non_dryables_removed = False - self.dry_complete = False - - @workflow.run - async def run(self): - await workflow.execute_activity( - add_detergent, start_to_close_timeout=timedelta(seconds=10) - ) - self.has_detergent = True - await workflow.wait_condition(lambda: self.wash_complete) - await workflow.execute_activity( - remove_non_dryables, start_to_close_timeout=timedelta(seconds=10) - ) - self.non_dryables_removed = True - await workflow.wait_condition(lambda: self.dry_complete) - - @workflow.update - async def wash(self): - await workflow.wait_condition(lambda: self.has_detergent) - self.wash_complete = True - workflow.logger.info("washing") - - @workflow.update - async def dry(self): - await workflow.wait_condition( - lambda: self.wash_complete and self.non_dryables_removed - ) - self.dry_complete = True - workflow.logger.info("drying") - - -@activity.defn -async def add_detergent(): - print("adding detergent") - - -@activity.defn -async def remove_non_dryables(): - print("removing non-dryables") - - -async def app(wf: WorkflowHandle): - await asyncio.gather( - wf.execute_update(WashAndDryCycle.dry), wf.execute_update(WashAndDryCycle.wash) - ) - - -async def main(): - client = await Client.connect("localhost:7233") - - async with Worker( - client, - task_queue="tq", - workflows=[WashAndDryCycle], - activities=[add_detergent, remove_non_dryables], - ): - handle = await client.start_workflow( - WashAndDryCycle.run, - id="wid", - task_queue="tq", - id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, - ) - await app(handle) - - -if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) - asyncio.run(main()) diff --git a/update/serialized_handling_of_n_messages.py b/update/serialized_handling_of_n_messages.py deleted file mode 100644 index 4c9a0d5c..00000000 --- a/update/serialized_handling_of_n_messages.py +++ /dev/null @@ -1,114 +0,0 @@ -import asyncio -import logging -from asyncio import Future -from collections import deque -from datetime import timedelta - -from temporalio import activity, common, workflow -from temporalio.client import Client, WorkflowHandle -from temporalio.worker import Worker - -Arg = str -Result = str - -# Problem: -# ------- -# - Your workflow receives an unbounded number of updates. -# - Each update must be processed by calling two activities. -# - The next update may not start processing until the previous has completed. - -# Solution: -# -------- -# Enqueue updates, and process items from the queue in a single coroutine (the main workflow -# coroutine). - -# Discussion: -# ---------- -# The queue is used because Temporal's async update & signal handlers will interleave if they -# contain multiple yield points. An alternative would be to use standard async handler functions, -# with handling being done with an asyncio.Lock held. The queue approach would be necessary if we -# need to process in an order other than arrival. - - -@workflow.defn -class MessageProcessor: - - def __init__(self): - self.queue = deque[tuple[Arg, Future[Result]]]() - - @workflow.run - async def run(self): - while True: - await workflow.wait_condition(lambda: len(self.queue) > 0) - while self.queue: - arg, fut = self.queue.popleft() - fut.set_result(await self.execute_processing_task(arg)) - if workflow.info().is_continue_as_new_suggested(): - # Footgun: If we don't let the event loop tick, then CAN will end the workflow - # before the update handler is notified that the result future has completed. - # See https://github.com/temporalio/features/issues/481 - await asyncio.sleep(0) # Let update handler complete - print("CAN") - return workflow.continue_as_new() - - # Note: handler must be async if we are both enqueuing, and returning an update result - # => We could add SDK APIs to manually complete updates. - @workflow.update - async def process_message(self, arg: Arg) -> Result: - # Footgun: handler may need to wait for workflow initialization after CAN - # See https://github.com/temporalio/features/issues/400 - # await workflow.wait_condition(lambda: hasattr(self, "queue")) - fut = Future[Result]() - self.queue.append((arg, fut)) # Note: update validation gates enqueue - return await fut - - async def execute_processing_task(self, arg: Arg) -> Result: - # The purpose of the two activities and the result string format is to permit checks that - # the activities of different tasks do not interleave. - t1, t2 = [ - await workflow.execute_activity( - get_current_time, start_to_close_timeout=timedelta(seconds=10) - ) - for _ in range(2) - ] - return f"{arg}-result-{t1}-{t2}" - - -time = 0 - - -@activity.defn -async def get_current_time() -> int: - global time - time += 1 - return time - - -async def app(wf: WorkflowHandle): - for i in range(20): - print(f"app(): sending update {i}") - result = await wf.execute_update(MessageProcessor.process_message, f"arg {i}") - print(f"app(): {result}") - - -async def main(): - client = await Client.connect("localhost:7233") - - async with Worker( - client, - task_queue="tq", - workflows=[MessageProcessor], - activities=[get_current_time], - ): - wf = await client.start_workflow( - MessageProcessor.run, - id="wid", - task_queue="tq", - id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, - ) - await asyncio.gather(app(wf), wf.result()) - - -if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) - asyncio.run(main()) diff --git a/update/serialized_handling_of_n_messages_cretz.py b/update/serialized_handling_of_n_messages_cretz.py deleted file mode 100644 index 64bf2bf9..00000000 --- a/update/serialized_handling_of_n_messages_cretz.py +++ /dev/null @@ -1,48 +0,0 @@ -from collections import deque -from datetime import timedelta -from typing import Optional - -from temporalio import workflow - -# !!! -# This version requires update to complete with result and won't CAN until after -# everything is done -# !!! - - -class UpdateTask: - def __init__(self, arg: str) -> None: - self.arg = arg - self.result: Optional[str] = None - self.returned = False - - -@workflow.defn -class MessageProcessor: - def __init__(self) -> None: - self.queue: deque[UpdateTask] = deque() - - @workflow.run - async def run(self) -> None: - while not workflow.info().is_continue_as_new_suggested() or len(self.queue) > 0: - await workflow.wait_condition(lambda: len(self.queue) > 0) - await self.process_task(self.queue.popleft()) - workflow.continue_as_new() - - @workflow.update - async def do_task(self, arg: str) -> str: - # Add task and wait on result - task = UpdateTask(arg) - try: - self.queue.append(task) - await workflow.wait_condition(lambda: task.result is not None) - assert task.result - return task.result - finally: - task.returned = True - - async def process_task(self, task: UpdateTask) -> None: - task.result = await workflow.execute_activity( - "some_activity", task.arg, start_to_close_timeout=timedelta(seconds=10) - ) - await workflow.wait_condition(lambda: task.returned) diff --git a/update/serialized_handling_of_n_messages_cretz_with_new_apis.py b/update/serialized_handling_of_n_messages_cretz_with_new_apis.py deleted file mode 100644 index a60cf0c0..00000000 --- a/update/serialized_handling_of_n_messages_cretz_with_new_apis.py +++ /dev/null @@ -1,41 +0,0 @@ -from collections import deque -from datetime import timedelta -from typing import Optional - -from temporalio import workflow - -# !!! -# This version requires update to complete with result and won't CAN until after -# everything is done -# !!! - - -class UpdateTask: - def __init__(self, arg: str, update_id: str) -> None: - self.arg = arg - self.update_id: str - self.result: Optional[str] = None - - -@workflow.defn -class MessageProcessor: - def __init__(self) -> None: - self.queue: deque[UpdateTask] = deque() - - @workflow.run - async def run(self) -> None: - while not workflow.info().is_continue_as_new_suggested() or len(self.queue) > 0: - await workflow.wait_condition(lambda: len(self.queue) > 0) - await self.process_task(self.queue.popleft()) - workflow.continue_as_new() - - @workflow.update - async def do_task(self, arg: str) -> str: - task = UpdateTask(arg, update_id=workflow.current_update_id()) - self.queue.append(task) - await workflow.wait_condition(lambda: task.result is not None) - return task.result - - async def process_task(self, task: UpdateTask) -> None: - # execute_activity(...) - await workflow.wait_condition(lambda: workflow.update_completed(task.update_id))