Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Commit

Permalink
Early abort support (#30)
Browse files Browse the repository at this point in the history
* Attempt at making early exit work

* Handle the case where it doesn't respond to the signal

* Changed output messages, and fixed linter errors

* Get SIGTERM early termination working in some cases

* Fix linting errors

This version still does not work well for early cancellation

* Update to signals for cancellation

* Fix linting errors

* Update to newest signals api

* Fix linting error

* Update SDK

* Update SDK

* Update SDK

* Update SDK

* Update SDK

* Update SDK

* Update SDK

* Update SDK

* Update SDK

* Change module name

* Temporarily disable signal setting for debugging

* Update SDK

* Update SDK

* Update SDK

* Re-enabled exit event, and added missing cancelled_early output

* Fix linting

* Add git to dockerfile to work with dev version of SDK

* Fix git install

* Fix file name reference in Dockerfile

* Update to release of python SDK

* Remove git install

No longer needed due to the required dependency being on pypi

* Automatic upate of README.md by arcaflow-docsgen arcabot

---------

Co-authored-by: arcabot <[email protected]>
  • Loading branch information
jaredoconnell and arcalot-bot authored Sep 22, 2023
1 parent d4c1a5e commit 20ab1d1
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 90 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ jobs:
contents: write
pull-requests: write
with:
plugin_path: "arcaflow_plugin_wait/wait_plugin.py"
plugin_path: "arcaflow_plugin_wait/arcaflow_plugin_wait.py"
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ RUN python -m pip install -r requirements.txt

WORKDIR /app/${package}

ENTRYPOINT ["python", "wait_plugin.py"]
ENTRYPOINT ["python", "arcaflow_plugin_wait.py"]
CMD []

LABEL org.opencontainers.image.source="https://github.com/arcalot/arcaflow-plugin-wait"
Expand Down
41 changes: 37 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,42 @@ Waits for the given amount of time
### Outputs


#### cancelled_early

<table><tbody>
<tr><th>Type:</th><td><code>scope</code></td><tr><th>Root object:</th><td>ErrorOutput</td></tr>
<tr><th>Properties</th><td><details><summary>actual_wait_seconds (<code>float</code>)</summary>
<table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>float</code></td>
</tbody></table>
</details><details><summary>error (<code>string</code>)</summary>
<table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>string</code></td></tbody></table>
</details></td></tr>
<tr><td colspan="2"><details><summary><strong>Objects</strong></summary><details><summary>ErrorOutput (<code>object</code>)</summary>
<table><tbody><tr><th>Type:</th><td><code>object</code></td><tr><th>Properties</th><td><details><summary>actual_wait_seconds (<code>float</code>)</summary>
<table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>float</code></td>
</tbody></table>
</details><details><summary>error (<code>string</code>)</summary>
<table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>string</code></td></tbody></table>
</details></td></tr>
</tbody></table>
</details></details></td></tr>
</tbody></table>

#### error

<table><tbody>
<tr><th>Type:</th><td><code>scope</code></td><tr><th>Root object:</th><td>ErrorOutput</td></tr>
<tr><th>Properties</th><td><details><summary>error (<code>string</code>)</summary>
<tr><th>Properties</th><td><details><summary>actual_wait_seconds (<code>float</code>)</summary>
<table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>float</code></td>
</tbody></table>
</details><details><summary>error (<code>string</code>)</summary>
<table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>string</code></td></tbody></table>
</details></td></tr>
<tr><td colspan="2"><details><summary><strong>Objects</strong></summary><details><summary>ErrorOutput (<code>object</code>)</summary>
<table><tbody><tr><th>Type:</th><td><code>object</code></td><tr><th>Properties</th><td><details><summary>error (<code>string</code>)</summary>
<table><tbody><tr><th>Type:</th><td><code>object</code></td><tr><th>Properties</th><td><details><summary>actual_wait_seconds (<code>float</code>)</summary>
<table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>float</code></td>
</tbody></table>
</details><details><summary>error (<code>string</code>)</summary>
<table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>string</code></td></tbody></table>
</details></td></tr>
</tbody></table>
Expand All @@ -60,11 +87,17 @@ Waits for the given amount of time

<table><tbody>
<tr><th>Type:</th><td><code>scope</code></td><tr><th>Root object:</th><td>SuccessOutput</td></tr>
<tr><th>Properties</th><td><details><summary>message (<code>string</code>)</summary>
<tr><th>Properties</th><td><details><summary>actual_wait_seconds (<code>float</code>)</summary>
<table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>float</code></td>
</tbody></table>
</details><details><summary>message (<code>string</code>)</summary>
<table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>string</code></td></tbody></table>
</details></td></tr>
<tr><td colspan="2"><details><summary><strong>Objects</strong></summary><details><summary>SuccessOutput (<code>object</code>)</summary>
<table><tbody><tr><th>Type:</th><td><code>object</code></td><tr><th>Properties</th><td><details><summary>message (<code>string</code>)</summary>
<table><tbody><tr><th>Type:</th><td><code>object</code></td><tr><th>Properties</th><td><details><summary>actual_wait_seconds (<code>float</code>)</summary>
<table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>float</code></td>
</tbody></table>
</details><details><summary>message (<code>string</code>)</summary>
<table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>string</code></td></tbody></table>
</details></td></tr>
</tbody></table>
Expand Down
103 changes: 103 additions & 0 deletions arcaflow_plugin_wait/arcaflow_plugin_wait.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#!/usr/bin/env python3

import sys
import time
import typing
from dataclasses import dataclass, field
from threading import Event

from arcaflow_plugin_sdk import plugin, validation, predefined_schemas


@dataclass
class InputParams:
seconds: typing.Annotated[float, validation.min(0.0)] = field(
metadata={
"id": "seconds",
"name": "seconds",
"description": "number of seconds to wait as a floating point "
"number for subsecond precision."
}
)


@dataclass
class SuccessOutput:
"""
This is the output data structure for the success case.
"""
message: str
actual_wait_seconds: float


@dataclass
class ErrorOutput:
"""
This is the output data structure in the error case.
"""
error: str
actual_wait_seconds: float


class WaitStep:
exit = Event()
finished_early = False

@plugin.signal_handler(
id=predefined_schemas.cancel_signal_schema.id,
name=predefined_schemas.cancel_signal_schema.display.name,
description=predefined_schemas.cancel_signal_schema.display.
description,
icon=predefined_schemas.cancel_signal_schema.display.icon,
)
def cancel_step(self, _input: predefined_schemas.cancelInput):
# First, let it know that this is the reason it's exiting.
self.finished_early = True
# Now signal to exit.
self.exit.set()

@plugin.step_with_signals(
id="wait",
name="Wait",
description="Waits for the given amount of time",
outputs={
"success": SuccessOutput,
"error": ErrorOutput,
"cancelled_early": ErrorOutput
},
signal_handler_method_names=["cancel_step"],
signal_emitters=[],
step_object_constructor=lambda: WaitStep(),
)
def wait(
self,
params: InputParams,
) -> typing.Tuple[str, typing.Union[SuccessOutput, ErrorOutput]]:
"""
:param params:
:return: the string identifying which output it is,
as well the output structure
"""
start_time = time.time()
self.exit.wait(params.seconds)
if self.finished_early:
actual_time = time.time() - start_time
return "cancelled_early", ErrorOutput(
"Aborted {:0.2f} seconds after being scheduled to wait for {}"
" seconds.".format(actual_time, params.seconds),
actual_time
)
else:
actual_time = time.time() - start_time
return "success", SuccessOutput(
"Waited {:0.2f} seconds after being scheduled to wait for {}"
" seconds.".format(actual_time, params.seconds),
actual_time
)


if __name__ == "__main__":
sys.exit(plugin.run(plugin.build_schema(
WaitStep.wait,
)))
68 changes: 0 additions & 68 deletions arcaflow_plugin_wait/wait_plugin.py

This file was deleted.

2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ classifiers = [
]
homepage = "https://github.com/arcalot/arcaflow-plugin-wait"
packages = [
{ include="wait_plugin.py", from="./arcaflow_plugin_wait" },
{ include="arcaflow_plugin_wait.py", from="./arcaflow_plugin_wait" },
]

[tool.poetry.dependencies]
Expand Down
47 changes: 33 additions & 14 deletions tests/test_arcaflow_plugin_wait.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,67 @@
#!/usr/bin/env python3
import unittest

from arcaflow_plugin_sdk import plugin
from arcaflow_plugin_sdk import plugin, predefined_schemas

import wait_plugin
import time
import arcaflow_plugin_wait

WAIT_TIME = 0.1
SKIPPED_WAIT_TIME = 1.0
PREMATURE_TIME = 0.05


class WaitTest(unittest.TestCase):
@staticmethod
def test_serialization():
plugin.test_object_serialization(
wait_plugin.InputParams(
arcaflow_plugin_wait.InputParams(
WAIT_TIME
)
)

plugin.test_object_serialization(
wait_plugin.SuccessOutput(
"Waited for {} seconds".format(WAIT_TIME)
arcaflow_plugin_wait.SuccessOutput(
"Waited {:0.2f} seconds after being scheduled to wait for"
" {} seconds.".format(WAIT_TIME, WAIT_TIME),
actual_wait_seconds=WAIT_TIME
)
)

plugin.test_object_serialization(
wait_plugin.ErrorOutput(
error="Failed waiting for {} seconds".format(WAIT_TIME)
arcaflow_plugin_wait.ErrorOutput(
error="Aborted {:0.2f} seconds after being scheduled to wait"
" for {} seconds.".format(PREMATURE_TIME, WAIT_TIME),
actual_wait_seconds=PREMATURE_TIME
)
)

def test_functional(self):
input_params = wait_plugin.InputParams(
# Test simple wait
input_params = arcaflow_plugin_wait.InputParams(
seconds=WAIT_TIME
)

output_id, output_data = wait_plugin.wait(input_params)
wait_step = arcaflow_plugin_wait.WaitStep()
output_id, output_data = wait_step.wait(input_params)

self.assertEqual("success", output_id)
self.assertEqual(
output_data,
wait_plugin.SuccessOutput(
"Waited for {} seconds".format(WAIT_TIME)
)
output_data.message,
"Waited {:0.2f} seconds after being scheduled to wait for {}"
" seconds.".format(output_data.actual_wait_seconds, WAIT_TIME)
)
# Test cancellation
input_params = arcaflow_plugin_wait.InputParams(
seconds=SKIPPED_WAIT_TIME
)
wait_step = arcaflow_plugin_wait.WaitStep()
wait_step.cancel_step(wait_step, predefined_schemas.cancelInput())
start_time = time.time()
output_id, output_data = wait_step.wait(input_params)
run_duration = time.time() - start_time
# It starts in a cancelled state, so it should be plenty less than
# the full wait time.
self.assertLess(run_duration, SKIPPED_WAIT_TIME / 2.0)


if __name__ == '__main__':
Expand Down

0 comments on commit 20ab1d1

Please sign in to comment.