diff --git a/update/job_runner_I1_native.py b/update/job_runner_I1_native.py index 494798b3..74a6d9ca 100644 --- a/update/job_runner_I1_native.py +++ b/update/job_runner_I1_native.py @@ -42,6 +42,13 @@ class JobOutput: UpdateID = str Workflow = Type + +@dataclass +class Update: + id: UpdateID + arg: I + + _sdk_internals_pending_tasks_count = 0 _sdk_internals_handler_mutex = asyncio.Lock() @@ -54,11 +61,11 @@ def _sdk_internals_all_handlers_completed(self) -> bool: @asynccontextmanager async def _sdk_internals__track_pending__wait_until_ready__serialize_execution( - ready_condition: Callable[[], bool] + execute_condition: Callable[[], bool] ): global _sdk_internals_pending_tasks_count _sdk_internals_pending_tasks_count += 1 - await workflow.wait_condition(ready_condition) + await workflow.wait_condition(execute_condition) await _sdk_internals_handler_mutex.acquire() try: yield @@ -72,15 +79,15 @@ class SDKInternals: # pending tasks tracking, and synchronization functionality. This is a fake implementation: the # real implementation will automatically inspect and wrap the user's declared update handlers. - def ready_to_execute(self, arg: I) -> bool: - # Implemented by user + def ready_to_execute(self, update: Update) -> bool: + # Overridden by users who wish to control order of execution return True @workflow.update async def run_shell_script_job(self, arg: I) -> O: handler = getattr(self, "_" + inspect.currentframe().f_code.co_name) async with _sdk_internals__track_pending__wait_until_ready__serialize_execution( - lambda: self.ready_to_execute(arg) + lambda: self.ready_to_execute(Update(arg.id, arg)) ): return await handler(arg) @@ -88,13 +95,14 @@ async def run_shell_script_job(self, arg: I) -> O: async def run_python_job(self, arg: I) -> O: handler = getattr(self, "_" + inspect.currentframe().f_code.co_name) async with _sdk_internals__track_pending__wait_until_ready__serialize_execution( - lambda: self.ready_to_execute(arg) + lambda: self.ready_to_execute(Update(arg.id, arg)) ): return await handler(arg) # Monkey-patch proposed new public API setattr(workflow, "all_handlers_completed", _sdk_internals_all_handlers_completed) +setattr(workflow, "Update", Update) ## ## END SDK internals prototype ## @@ -115,12 +123,13 @@ async def run(self): await workflow.wait_condition( lambda: ( workflow.info().is_continue_as_new_suggested() - and self.all_handlers_completed() + and workflow.all_handlers_completed() ) ) workflow.continue_as_new() - def ready_to_execute(self, job: Job) -> bool: + def ready_to_execute(self, update: workflow.Update) -> bool: + job = update.arg if not set(job.depends_on) <= self.completed_tasks: return False if after_time := job.after_time: @@ -129,45 +138,41 @@ def ready_to_execute(self, job: Job) -> bool: return True # These are the real handler functions. When we implement SDK support, these will use the - # @workflow.update decorator and will not use an underscore prefix. + # decorator form commented out below, and will not use an underscore prefix. - # @workflow.update + # @workflow.update(execute_condition=ready_to_execute) async def _run_shell_script_job(self, job: Job) -> JobOutput: - try: - if security_errors := await workflow.execute_activity( - run_shell_script_security_linter, - args=[job.run], - start_to_close_timeout=timedelta(seconds=10), - ): - return JobOutput(status=1, stdout="", stderr=security_errors) - job_output = await workflow.execute_activity( - run_job, args=[job], start_to_close_timeout=timedelta(seconds=10) - ) - return job_output - finally: - # FIXME: unbounded memory usage - self.completed_tasks.add(job.id) + if security_errors := await workflow.execute_activity( + run_shell_script_security_linter, + args=[job.run], + start_to_close_timeout=timedelta(seconds=10), + ): + return JobOutput(status=1, stdout="", stderr=security_errors) + job_output = await workflow.execute_activity( + run_job, args=[job], start_to_close_timeout=timedelta(seconds=10) + ) + # FIXME: unbounded memory usage + self.completed_tasks.add(job.id) + return job_output - # @workflow.update + # @workflow.update(execute_condition=ready_to_execute) async def _run_python_job(self, job: Job) -> JobOutput: - try: - if not await workflow.execute_activity( - check_python_interpreter_version, - args=[job.python_interpreter_version], - start_to_close_timeout=timedelta(seconds=10), - ): - return JobOutput( - status=1, - stdout="", - stderr=f"Python interpreter version {job.python_interpreter_version} is not available", - ) - job_output = await workflow.execute_activity( - run_job, args=[job], start_to_close_timeout=timedelta(seconds=10) + if not await workflow.execute_activity( + check_python_interpreter_version, + args=[job.python_interpreter_version], + start_to_close_timeout=timedelta(seconds=10), + ): + return JobOutput( + status=1, + stdout="", + stderr=f"Python interpreter version {job.python_interpreter_version} is not available", ) - return job_output - finally: - # FIXME: unbounded memory usage - self.completed_tasks.add(job.id) + job_output = await workflow.execute_activity( + run_job, args=[job], start_to_close_timeout=timedelta(seconds=10) + ) + # FIXME: unbounded memory usage + self.completed_tasks.add(job.id) + return job_output @activity.defn