diff --git a/.github/workflows/workflow.yaml b/.github/workflows/workflow.yaml index bd2c53b..2b93a69 100644 --- a/.github/workflows/workflow.yaml +++ b/.github/workflows/workflow.yaml @@ -24,4 +24,4 @@ jobs: contents: write pull-requests: write with: - plugin_path: "arcaflow_plugin_wait/wait_plugin.py" + plugin_path: "arcaflow_plugin_wait/arcaflow_plugin_wait.py" diff --git a/Dockerfile b/Dockerfile index a08f474..e62400c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -40,7 +40,7 @@ RUN python -m pip install -r requirements.txt WORKDIR /app/${package} -ENTRYPOINT ["python", "wait_plugin.py"] +ENTRYPOINT ["python", "arcaflow_plugin_wait.py"] CMD [] LABEL org.opencontainers.image.source="https://github.com/arcalot/arcaflow-plugin-wait" diff --git a/README.md b/README.md index bb66a84..eb15825 100644 --- a/README.md +++ b/README.md @@ -41,15 +41,42 @@ Waits for the given amount of time ### Outputs +#### cancelled_early + +<table><tbody> +<tr><th>Type:</th><td><code>scope</code></td><tr><th>Root object:</th><td>ErrorOutput</td></tr> +<tr><th>Properties</th><td><details><summary>actual_wait_seconds (<code>float</code>)</summary> + <table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>float</code></td> +</tbody></table> + </details><details><summary>error (<code>string</code>)</summary> + <table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>string</code></td></tbody></table> + </details></td></tr> +<tr><td colspan="2"><details><summary><strong>Objects</strong></summary><details><summary>ErrorOutput (<code>object</code>)</summary> + <table><tbody><tr><th>Type:</th><td><code>object</code></td><tr><th>Properties</th><td><details><summary>actual_wait_seconds (<code>float</code>)</summary> + <table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>float</code></td> +</tbody></table> + </details><details><summary>error (<code>string</code>)</summary> + <table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>string</code></td></tbody></table> + </details></td></tr> +</tbody></table> + </details></details></td></tr> +</tbody></table> + #### error <table><tbody> <tr><th>Type:</th><td><code>scope</code></td><tr><th>Root object:</th><td>ErrorOutput</td></tr> -<tr><th>Properties</th><td><details><summary>error (<code>string</code>)</summary> +<tr><th>Properties</th><td><details><summary>actual_wait_seconds (<code>float</code>)</summary> + <table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>float</code></td> +</tbody></table> + </details><details><summary>error (<code>string</code>)</summary> <table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>string</code></td></tbody></table> </details></td></tr> <tr><td colspan="2"><details><summary><strong>Objects</strong></summary><details><summary>ErrorOutput (<code>object</code>)</summary> - <table><tbody><tr><th>Type:</th><td><code>object</code></td><tr><th>Properties</th><td><details><summary>error (<code>string</code>)</summary> + <table><tbody><tr><th>Type:</th><td><code>object</code></td><tr><th>Properties</th><td><details><summary>actual_wait_seconds (<code>float</code>)</summary> + <table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>float</code></td> +</tbody></table> + </details><details><summary>error (<code>string</code>)</summary> <table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>string</code></td></tbody></table> </details></td></tr> </tbody></table> @@ -60,11 +87,17 @@ Waits for the given amount of time <table><tbody> <tr><th>Type:</th><td><code>scope</code></td><tr><th>Root object:</th><td>SuccessOutput</td></tr> -<tr><th>Properties</th><td><details><summary>message (<code>string</code>)</summary> +<tr><th>Properties</th><td><details><summary>actual_wait_seconds (<code>float</code>)</summary> + <table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>float</code></td> +</tbody></table> + </details><details><summary>message (<code>string</code>)</summary> <table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>string</code></td></tbody></table> </details></td></tr> <tr><td colspan="2"><details><summary><strong>Objects</strong></summary><details><summary>SuccessOutput (<code>object</code>)</summary> - <table><tbody><tr><th>Type:</th><td><code>object</code></td><tr><th>Properties</th><td><details><summary>message (<code>string</code>)</summary> + <table><tbody><tr><th>Type:</th><td><code>object</code></td><tr><th>Properties</th><td><details><summary>actual_wait_seconds (<code>float</code>)</summary> + <table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>float</code></td> +</tbody></table> + </details><details><summary>message (<code>string</code>)</summary> <table><tbody><tr><th>Required:</th><td>Yes</td></tr><tr><th>Type:</th><td><code>string</code></td></tbody></table> </details></td></tr> </tbody></table> diff --git a/arcaflow_plugin_wait/arcaflow_plugin_wait.py b/arcaflow_plugin_wait/arcaflow_plugin_wait.py new file mode 100644 index 0000000..fae2d7a --- /dev/null +++ b/arcaflow_plugin_wait/arcaflow_plugin_wait.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 + +import sys +import time +import typing +from dataclasses import dataclass, field +from threading import Event + +from arcaflow_plugin_sdk import plugin, validation, predefined_schemas + + +@dataclass +class InputParams: + seconds: typing.Annotated[float, validation.min(0.0)] = field( + metadata={ + "id": "seconds", + "name": "seconds", + "description": "number of seconds to wait as a floating point " + "number for subsecond precision." + } + ) + + +@dataclass +class SuccessOutput: + """ + This is the output data structure for the success case. + """ + message: str + actual_wait_seconds: float + + +@dataclass +class ErrorOutput: + """ + This is the output data structure in the error case. + """ + error: str + actual_wait_seconds: float + + +class WaitStep: + exit = Event() + finished_early = False + + @plugin.signal_handler( + id=predefined_schemas.cancel_signal_schema.id, + name=predefined_schemas.cancel_signal_schema.display.name, + description=predefined_schemas.cancel_signal_schema.display. + description, + icon=predefined_schemas.cancel_signal_schema.display.icon, + ) + def cancel_step(self, _input: predefined_schemas.cancelInput): + # First, let it know that this is the reason it's exiting. + self.finished_early = True + # Now signal to exit. + self.exit.set() + + @plugin.step_with_signals( + id="wait", + name="Wait", + description="Waits for the given amount of time", + outputs={ + "success": SuccessOutput, + "error": ErrorOutput, + "cancelled_early": ErrorOutput + }, + signal_handler_method_names=["cancel_step"], + signal_emitters=[], + step_object_constructor=lambda: WaitStep(), + ) + def wait( + self, + params: InputParams, + ) -> typing.Tuple[str, typing.Union[SuccessOutput, ErrorOutput]]: + """ + :param params: + + :return: the string identifying which output it is, + as well the output structure + """ + start_time = time.time() + self.exit.wait(params.seconds) + if self.finished_early: + actual_time = time.time() - start_time + return "cancelled_early", ErrorOutput( + "Aborted {:0.2f} seconds after being scheduled to wait for {}" + " seconds.".format(actual_time, params.seconds), + actual_time + ) + else: + actual_time = time.time() - start_time + return "success", SuccessOutput( + "Waited {:0.2f} seconds after being scheduled to wait for {}" + " seconds.".format(actual_time, params.seconds), + actual_time + ) + + +if __name__ == "__main__": + sys.exit(plugin.run(plugin.build_schema( + WaitStep.wait, + ))) diff --git a/arcaflow_plugin_wait/wait_plugin.py b/arcaflow_plugin_wait/wait_plugin.py deleted file mode 100644 index 8a4d640..0000000 --- a/arcaflow_plugin_wait/wait_plugin.py +++ /dev/null @@ -1,68 +0,0 @@ -#!/usr/bin/env python3 - -import sys -import typing -from dataclasses import dataclass, field -from time import sleep - -from arcaflow_plugin_sdk import plugin, validation - - -@dataclass -class InputParams: - seconds: typing.Annotated[float, validation.min(0.0)] = field( - metadata={ - "id": "seconds", - "name": "seconds", - "description": "number of seconds to wait as a floating point " - "number for subsecond precision." - } - ) - - -@dataclass -class SuccessOutput: - """ - This is the output data structure for the success case. - """ - message: str - - -@dataclass -class ErrorOutput: - """ - This is the output data structure in the error case. - """ - error: str - - -@plugin.step( - id="wait", - name="Wait", - description="Waits for the given amount of time", - outputs={"success": SuccessOutput, "error": ErrorOutput}, -) -def wait( - params: InputParams -) -> typing.Tuple[str, typing.Union[SuccessOutput, ErrorOutput]]: - """ - :param params: - - :return: the string identifying which output it is, - as well the output structure - """ - try: - sleep(params.seconds) - return "success", SuccessOutput( - "Waited for {} seconds".format(params.seconds) - ) - except BaseException: - return "error", ErrorOutput( - "Failed waiting for {} seconds".format(params.seconds) - ) - - -if __name__ == "__main__": - sys.exit(plugin.run(plugin.build_schema( - wait, - ))) diff --git a/poetry.lock b/poetry.lock index 889b291..b8cc241 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. [[package]] name = "arcaflow-plugin-sdk" diff --git a/pyproject.toml b/pyproject.toml index 4707883..cb3712c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,7 @@ classifiers = [ ] homepage = "https://github.com/arcalot/arcaflow-plugin-wait" packages = [ - { include="wait_plugin.py", from="./arcaflow_plugin_wait" }, + { include="arcaflow_plugin_wait.py", from="./arcaflow_plugin_wait" }, ] [tool.poetry.dependencies] diff --git a/tests/test_arcaflow_plugin_wait.py b/tests/test_arcaflow_plugin_wait.py index 76c995c..ee8f3ea 100644 --- a/tests/test_arcaflow_plugin_wait.py +++ b/tests/test_arcaflow_plugin_wait.py @@ -1,48 +1,67 @@ #!/usr/bin/env python3 import unittest -from arcaflow_plugin_sdk import plugin +from arcaflow_plugin_sdk import plugin, predefined_schemas -import wait_plugin +import time +import arcaflow_plugin_wait WAIT_TIME = 0.1 +SKIPPED_WAIT_TIME = 1.0 +PREMATURE_TIME = 0.05 class WaitTest(unittest.TestCase): @staticmethod def test_serialization(): plugin.test_object_serialization( - wait_plugin.InputParams( + arcaflow_plugin_wait.InputParams( WAIT_TIME ) ) plugin.test_object_serialization( - wait_plugin.SuccessOutput( - "Waited for {} seconds".format(WAIT_TIME) + arcaflow_plugin_wait.SuccessOutput( + "Waited {:0.2f} seconds after being scheduled to wait for" + " {} seconds.".format(WAIT_TIME, WAIT_TIME), + actual_wait_seconds=WAIT_TIME ) ) plugin.test_object_serialization( - wait_plugin.ErrorOutput( - error="Failed waiting for {} seconds".format(WAIT_TIME) + arcaflow_plugin_wait.ErrorOutput( + error="Aborted {:0.2f} seconds after being scheduled to wait" + " for {} seconds.".format(PREMATURE_TIME, WAIT_TIME), + actual_wait_seconds=PREMATURE_TIME ) ) def test_functional(self): - input_params = wait_plugin.InputParams( + # Test simple wait + input_params = arcaflow_plugin_wait.InputParams( seconds=WAIT_TIME ) - - output_id, output_data = wait_plugin.wait(input_params) + wait_step = arcaflow_plugin_wait.WaitStep() + output_id, output_data = wait_step.wait(input_params) self.assertEqual("success", output_id) self.assertEqual( - output_data, - wait_plugin.SuccessOutput( - "Waited for {} seconds".format(WAIT_TIME) - ) + output_data.message, + "Waited {:0.2f} seconds after being scheduled to wait for {}" + " seconds.".format(output_data.actual_wait_seconds, WAIT_TIME) + ) + # Test cancellation + input_params = arcaflow_plugin_wait.InputParams( + seconds=SKIPPED_WAIT_TIME ) + wait_step = arcaflow_plugin_wait.WaitStep() + wait_step.cancel_step(wait_step, predefined_schemas.cancelInput()) + start_time = time.time() + output_id, output_data = wait_step.wait(input_params) + run_duration = time.time() - start_time + # It starts in a cancelled state, so it should be plenty less than + # the full wait time. + self.assertLess(run_duration, SKIPPED_WAIT_TIME / 2.0) if __name__ == '__main__':