Skip to content

Commit

Permalink
Continue sketching
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed May 30, 2024
1 parent 7064b7a commit 0f335b4
Showing 1 changed file with 47 additions and 42 deletions.
89 changes: 47 additions & 42 deletions update/job_runner_I1_native.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ class JobOutput:
UpdateID = str
Workflow = Type


@dataclass
class Update:
id: UpdateID
arg: I


_sdk_internals_pending_tasks_count = 0
_sdk_internals_handler_mutex = asyncio.Lock()

Expand All @@ -54,11 +61,11 @@ def _sdk_internals_all_handlers_completed(self) -> bool:

@asynccontextmanager
async def _sdk_internals__track_pending__wait_until_ready__serialize_execution(
ready_condition: Callable[[], bool]
execute_condition: Callable[[], bool]
):
global _sdk_internals_pending_tasks_count
_sdk_internals_pending_tasks_count += 1
await workflow.wait_condition(ready_condition)
await workflow.wait_condition(execute_condition)
await _sdk_internals_handler_mutex.acquire()
try:
yield
Expand All @@ -72,29 +79,30 @@ class SDKInternals:
# pending tasks tracking, and synchronization functionality. This is a fake implementation: the
# real implementation will automatically inspect and wrap the user's declared update handlers.

def ready_to_execute(self, arg: I) -> bool:
# Implemented by user
def ready_to_execute(self, update: Update) -> bool:
# Overridden by users who wish to control order of execution
return True

@workflow.update
async def run_shell_script_job(self, arg: I) -> O:
handler = getattr(self, "_" + inspect.currentframe().f_code.co_name)
async with _sdk_internals__track_pending__wait_until_ready__serialize_execution(
lambda: self.ready_to_execute(arg)
lambda: self.ready_to_execute(Update(arg.id, arg))
):
return await handler(arg)

@workflow.update
async def run_python_job(self, arg: I) -> O:
handler = getattr(self, "_" + inspect.currentframe().f_code.co_name)
async with _sdk_internals__track_pending__wait_until_ready__serialize_execution(
lambda: self.ready_to_execute(arg)
lambda: self.ready_to_execute(Update(arg.id, arg))
):
return await handler(arg)


# Monkey-patch proposed new public API
setattr(workflow, "all_handlers_completed", _sdk_internals_all_handlers_completed)
setattr(workflow, "Update", Update)
##
## END SDK internals prototype
##
Expand All @@ -115,12 +123,13 @@ async def run(self):
await workflow.wait_condition(
lambda: (
workflow.info().is_continue_as_new_suggested()
and self.all_handlers_completed()
and workflow.all_handlers_completed()
)
)
workflow.continue_as_new()

def ready_to_execute(self, job: Job) -> bool:
def ready_to_execute(self, update: workflow.Update) -> bool:
job = update.arg
if not set(job.depends_on) <= self.completed_tasks:
return False
if after_time := job.after_time:
Expand All @@ -129,45 +138,41 @@ def ready_to_execute(self, job: Job) -> bool:
return True

# These are the real handler functions. When we implement SDK support, these will use the
# @workflow.update decorator and will not use an underscore prefix.
# decorator form commented out below, and will not use an underscore prefix.

# @workflow.update
# @workflow.update(execute_condition=ready_to_execute)
async def _run_shell_script_job(self, job: Job) -> JobOutput:
try:
if security_errors := await workflow.execute_activity(
run_shell_script_security_linter,
args=[job.run],
start_to_close_timeout=timedelta(seconds=10),
):
return JobOutput(status=1, stdout="", stderr=security_errors)
job_output = await workflow.execute_activity(
run_job, args=[job], start_to_close_timeout=timedelta(seconds=10)
)
return job_output
finally:
# FIXME: unbounded memory usage
self.completed_tasks.add(job.id)
if security_errors := await workflow.execute_activity(
run_shell_script_security_linter,
args=[job.run],
start_to_close_timeout=timedelta(seconds=10),
):
return JobOutput(status=1, stdout="", stderr=security_errors)
job_output = await workflow.execute_activity(
run_job, args=[job], start_to_close_timeout=timedelta(seconds=10)
)
# FIXME: unbounded memory usage
self.completed_tasks.add(job.id)
return job_output

# @workflow.update
# @workflow.update(execute_condition=ready_to_execute)
async def _run_python_job(self, job: Job) -> JobOutput:
try:
if not await workflow.execute_activity(
check_python_interpreter_version,
args=[job.python_interpreter_version],
start_to_close_timeout=timedelta(seconds=10),
):
return JobOutput(
status=1,
stdout="",
stderr=f"Python interpreter version {job.python_interpreter_version} is not available",
)
job_output = await workflow.execute_activity(
run_job, args=[job], start_to_close_timeout=timedelta(seconds=10)
if not await workflow.execute_activity(
check_python_interpreter_version,
args=[job.python_interpreter_version],
start_to_close_timeout=timedelta(seconds=10),
):
return JobOutput(
status=1,
stdout="",
stderr=f"Python interpreter version {job.python_interpreter_version} is not available",
)
return job_output
finally:
# FIXME: unbounded memory usage
self.completed_tasks.add(job.id)
job_output = await workflow.execute_activity(
run_job, args=[job], start_to_close_timeout=timedelta(seconds=10)
)
# FIXME: unbounded memory usage
self.completed_tasks.add(job.id)
return job_output


@activity.defn
Expand Down

0 comments on commit 0f335b4

Please sign in to comment.