From ad798497ba320716e2473ac7c52003b491b693ba Mon Sep 17 00:00:00 2001 From: Nathan Freeman Date: Mon, 9 Sep 2024 16:19:23 -0500 Subject: [PATCH] Add secrets engine to workflow engine --- src/api/specs/WorkflowsAPI.yaml | 16 +++++-- src/engine/src/contrib/tapis/TapisPlugin.py | 2 + .../engines/TapisSKSecretsEngine.py | 32 +++++++++++++ .../src/core/ioc/IOCContainerFactory.py | 8 +++- .../core/tasks/TaskInputFileStagingService.py | 21 +++++++-- .../src/core/workflows/ValueFromService.py | 18 +++++-- src/engine/src/owe_python_sdk/Plugin.py | 10 ++-- src/engine/src/owe_python_sdk/utils.py | 26 +++++++++- src/engine/src/tests/TestSDKUtils.py | 47 +++++++++++++++++++ src/engine/src/tests/run.sh | 6 ++- 10 files changed, 166 insertions(+), 20 deletions(-) create mode 100644 src/engine/src/contrib/tapis/middleware/engines/TapisSKSecretsEngine.py create mode 100644 src/engine/src/tests/TestSDKUtils.py diff --git a/src/api/specs/WorkflowsAPI.yaml b/src/api/specs/WorkflowsAPI.yaml index f5566114..670fa9d7 100644 --- a/src/api/specs/WorkflowsAPI.yaml +++ b/src/api/specs/WorkflowsAPI.yaml @@ -2542,11 +2542,11 @@ components: anyOf: - $ref: '#/components/schemas/DockerhubDestination' - $ref: '#/components/schemas/LocalDestination' - discriminator: - propertyName: type - mapping: - dockerhub: "#/components/schemas/DockerhubDestination" - local: "#/components/schemas/LocalDestination" + # discriminator: + # propertyName: type + # mapping: + # dockerhub: "#/components/schemas/DockerhubDestination" + # local: "#/components/schemas/LocalDestination" BaseDestination: type: object @@ -2793,6 +2793,12 @@ components: type: string ValueFromSecret: + type: object + properties: + secret: + $ref: "#/components/schemas/SecretRef" + + SecretRef: type: object properties: engine: diff --git a/src/engine/src/contrib/tapis/TapisPlugin.py b/src/engine/src/contrib/tapis/TapisPlugin.py index 21c598ea..640dc4b5 100644 --- a/src/engine/src/contrib/tapis/TapisPlugin.py +++ b/src/engine/src/contrib/tapis/TapisPlugin.py @@ -16,6 +16,7 @@ ) from contrib.tapis.middleware.event_handlers.archivers import TapisSystemArchiver from contrib.tapis.middleware.event_handlers.notifications import TapisWorkflowsAPIBackend +from contrib.tapis.middleware.engines import TapisSKSecretsEngine from contrib.tapis.executors import TapisActor, TapisJob from contrib.tapis.constants import TASK_TYPE_TAPIS_ACTOR, TASK_TYPE_TAPIS_JOB, ARCHIVER_TYPE_TAPIS_SYSTEM @@ -48,6 +49,7 @@ def __init__(self, name): ) self.register("task_executor", {TASK_TYPE_TAPIS_ACTOR: TapisActor}) self.register("task_executor", {TASK_TYPE_TAPIS_JOB: TapisJob}) + self.register("engine", {"tapis-security-kernal": TapisSKSecretsEngine}) self.register("schema_extension", SchemaExtension( _type="task_executor", sub_type="function", diff --git a/src/engine/src/contrib/tapis/middleware/engines/TapisSKSecretsEngine.py b/src/engine/src/contrib/tapis/middleware/engines/TapisSKSecretsEngine.py new file mode 100644 index 00000000..ce777ab1 --- /dev/null +++ b/src/engine/src/contrib/tapis/middleware/engines/TapisSKSecretsEngine.py @@ -0,0 +1,32 @@ +from contrib.tapis.helpers import TapisServiceAPIGateway + + +class TapisSKSecretsEngine: + def __init__(self): + + service_api_gateway = TapisServiceAPIGateway() + self.service_client = service_api_gateway.get_client() + + # self._kwargs = { + # "_tapis_set_x_headers_from_service": True, + # "_x_tapis_tenant": ctx.args["tapis_tenant_id"].value, + # "_x_tapis_user": ctx.args["tapis_pipeline_owner"].value, + # "_tapis_headers": { + # "X-WORKFLOW-EXECUTOR-TOKEN": ctx.args["workflow_executor_access_token"].value + # } + # } + + def __call__(self, tapis_tenant_id, sk_id): + try: + resp = self.service_client.sk.readSecret( + secretType="user", + secretName=sk_id, + user="workflows", + tenant=tapis_tenant_id, + version=0, + _tapis_set_x_headers_from_service=True + ) + # ctx.args["tapis_tenant_id"].value + return resp.secretMap.__dict__ + except Exception as e: + return None # TODO catch network error \ No newline at end of file diff --git a/src/engine/src/core/ioc/IOCContainerFactory.py b/src/engine/src/core/ioc/IOCContainerFactory.py index aedaef18..67ddaac0 100644 --- a/src/engine/src/core/ioc/IOCContainerFactory.py +++ b/src/engine/src/core/ioc/IOCContainerFactory.py @@ -1,3 +1,6 @@ +from typing import List + +from owe_python_sdk.Plugin import Plugin from core.ioc import IOCContainer from core.state import ReactiveState from core.daos import ( @@ -33,7 +36,7 @@ ) class IOCContainerFactory: - def build(self): + def build(self, plugins: List[Plugin] = []): container = IOCContainer() container.register("ReactiveState", @@ -152,7 +155,8 @@ def build(self): container.load("TaskRepository"), container.load("TaskOutputRepository"), container.load("ArgRepository"), - container.load("EnvRepository") + container.load("EnvRepository"), + plugins=plugins if len(plugins) > 0 else [] ) ) diff --git a/src/engine/src/core/tasks/TaskInputFileStagingService.py b/src/engine/src/core/tasks/TaskInputFileStagingService.py index 21cf0763..dc6fb658 100644 --- a/src/engine/src/core/tasks/TaskInputFileStagingService.py +++ b/src/engine/src/core/tasks/TaskInputFileStagingService.py @@ -3,6 +3,7 @@ from core.workflows import ValueFromService from owe_python_sdk.schema import Task from errors.tasks import TaskInputStagingError +from owe_python_sdk.utils import select_field class TaskInputFileStagingService: @@ -41,7 +42,7 @@ def stage(self, task: Task): ) except Exception as e: if input_.required: - raise TaskInputStagingError(f"No output found for task '{value_from[key].task_id}' with output id of '{value_from[key].output_id}'") + raise TaskInputStagingError(f"No output found for task '{value_from[key].task_id}' with output id of '{value_from[key].output_id}' | {str(e)}") if key == "args": try: value = self._value_from_service.get_arg_value_by_key( @@ -49,7 +50,7 @@ def stage(self, task: Task): ) except Exception as e: if input_.required: - raise TaskInputStagingError(f"Error attempting to fetch value from args at key '{value_from[key]}'") + raise TaskInputStagingError(f"Error attempting to fetch value from args at key '{value_from[key]}' | {str(e)}") if key == "env": try: value = self._value_from_service.get_env_value_by_key( @@ -57,8 +58,20 @@ def stage(self, task: Task): ) except Exception as e: if input_.required: - raise TaskInputStagingError(f"Error attempting to fetch value from env at key '{value_from[key]}'") - + raise TaskInputStagingError(f"Error attempting to fetch value from env at key '{value_from[key]}' | {str(e)}") + if key == "secret": + try: + value = select_field( + self._value_from_service.get_secret_value_by_engine_and_pk( + value_from[key].engine, + value_from[key].pk + ), + value_from[key].field_selector + ) + except Exception as e: + if input_.required: + raise TaskInputStagingError(f"Error attempting to fetch value from secret at key '{value_from[key]}' | {str(e)}") + self._create_input_(task, input_id, value) def _create_input_(self, task, input_id, value): diff --git a/src/engine/src/core/workflows/ValueFromService.py b/src/engine/src/core/workflows/ValueFromService.py index f1f9535a..2f674ba7 100644 --- a/src/engine/src/core/workflows/ValueFromService.py +++ b/src/engine/src/core/workflows/ValueFromService.py @@ -1,3 +1,6 @@ +from typing import List + +from owe_python_sdk.Plugin import Plugin from core.repositories import ( TaskOutputRepository, TaskRepository, @@ -5,19 +8,20 @@ ArgRepository ) - class ValueFromService: def __init__( self, task_repo: TaskRepository, task_output_repo: TaskOutputRepository, arg_repo: ArgRepository, - env_repo: EnvRepository + env_repo: EnvRepository, + plugins: List[Plugin] = [] ): self._task_repo = task_repo self._task_output_repo = task_output_repo self._arg_repo = arg_repo self._env_repo = env_repo + self._plugins = plugins def get_task_output_value_by_id(self, task_id, _id): task = self._task_repo.get_by_id(task_id) @@ -31,4 +35,12 @@ def get_env_value_by_key(self, key): def get_arg_value_by_key(self, key): value = self._arg_repo.get_value_by_key(key) return value - \ No newline at end of file + + # Finds the first secrets engine from plugins and fetches the secret + def get_secret_value_by_engine_and_pk(self, engine, pk): + plugin = next(filter( + lambda p: engine in p.engines, + self._plugins + )) + secrets_engine = plugin.engines[engine] + return secrets_engine(pk) \ No newline at end of file diff --git a/src/engine/src/owe_python_sdk/Plugin.py b/src/engine/src/owe_python_sdk/Plugin.py index e279f508..f969cfaf 100644 --- a/src/engine/src/owe_python_sdk/Plugin.py +++ b/src/engine/src/owe_python_sdk/Plugin.py @@ -3,7 +3,7 @@ from owe_python_sdk.middleware.ArchiveMiddleware import ArchiveMiddleware from owe_python_sdk.SchemaExtension import SchemaExtension -MIDDLEWARE_TYPES = [ "request", "archive", "notification_handler", "task_executor", "schema_extension" ] +MIDDLEWARE_TYPES = [ "request", "archive", "notification_handler", "task_executor", "schema_extension", "engine" ] class Plugin: def __init__(self, name): @@ -12,6 +12,7 @@ def __init__(self, name): self.archive_middlewares = [] self.notification_middlewares = [] self.task_executors = {} + self.engines = {} self.schema_extensions = [] def register(self, _type, middleware): @@ -36,6 +37,10 @@ def register(self, _type, middleware): # TODO more logic to ensure key(s) is a string and val is a # subclass (not an instance) of TaskExecutor self.task_executors = {**self.task_executors, **middleware} + elif _type == "engine": + if type(middleware) != dict: + raise Exception(f"Middleware Registration Error: Secrets Engine middleware must be registered as a dict in which the key is a unique identifier and value is the concrete class that inherits from Middleware(not an intance)") + self.engines = {**self.engines, **middleware} elif _type == "schema_extension": if type(middleware) != SchemaExtension: raise Exception(f"Schema Extension Registration Error: Expected type 'SchemaExtension' | Recieved {type(middleware)}") @@ -46,9 +51,8 @@ def dispatch(self, _type, ctx): raise Exception(f"Invalid Middleware type: Recieved '{_type}' | Expected oneOf {MIDDLEWARE_TYPES}") if _type == "request": - request = ctx for request_middleware in self.request_middlewares: - request = request_middleware(request) + request = request_middleware(ctx) # TODO include logic for dispatching event handlers diff --git a/src/engine/src/owe_python_sdk/utils.py b/src/engine/src/owe_python_sdk/utils.py index a55e2a37..050f41c1 100644 --- a/src/engine/src/owe_python_sdk/utils.py +++ b/src/engine/src/owe_python_sdk/utils.py @@ -1,3 +1,6 @@ +from typing import Union, List + + def adapt_runner(Runner, RunnerClassExtension: type): def __init__(self, *args, **kwargs): Runner.__init__(self) @@ -32,4 +35,25 @@ def get_schema_extensions(plugins, _type, sub_type=None): and (ext.sub_type == sub_type or sub_type == None) ] - return schema_extensions \ No newline at end of file + return schema_extensions + +def select_key_or_index(target: Union[dict, list], key_or_index: Union[str, int]): + if type(target) == list: + return target[int(key_or_index)] + + if type(target) == dict: + return target[key_or_index] + + raise TypeError("Argument `target` must be of type `dict` or `list`") + + +def select_field(obj: Union[dict, list], selectors: list[str] = []): + if len(selectors) == 0: + return obj + + selector = selectors[0] + if len(selectors) == 1: + return select_key_or_index(obj, selector) + + selected_obj = select_key_or_index(obj, selector) + return select_field(selected_obj, selectors[1:]) \ No newline at end of file diff --git a/src/engine/src/tests/TestSDKUtils.py b/src/engine/src/tests/TestSDKUtils.py new file mode 100644 index 00000000..cf65e914 --- /dev/null +++ b/src/engine/src/tests/TestSDKUtils.py @@ -0,0 +1,47 @@ +import unittest + +from owe_python_sdk.utils import select_field + + +class TestSDKUtils(unittest.TestCase): + + def testSelectField(self): + # Providing no field selectors should just return the object + self.assertTrue(select_field({}, []) == {}) + self.assertTrue(select_field(1, []) == 1) + + obj = { + "key": [ + "arr_item", + { + "shallow_key": "shallow_value", + "deep_key": { + "deeper_key": "deeper_value" + } + } + ] + } + self.assertTrue(type(select_field(obj, ["key"])) == list) + self.assertTrue(select_field(obj, ["key", 0]) == "arr_item") + self.assertTrue(select_field(obj, ["key", 1, "shallow_key"]) == "shallow_value") + self.assertTrue(select_field(obj, ["key", 1, "deep_key", "deeper_key"]) == "deeper_value") + + # Must throw type error if object provided is not of type dict or list + self.assertRaises(TypeError, lambda: select_field(1, ["test"])) + + # Must throw index error if attempting to access a non-existent index + # In the case below, there is no 3rd item in the list + self.assertRaises(IndexError, lambda: select_field(obj, ["key", 2])) + + arr = [ + "arr_item", + {"key": "value"} + ] + self.assertTrue(select_field(arr, [0]) == "arr_item") + self.assertTrue(select_field(arr, [1, "key"]) == "value") + +if __name__ == "__main__": + unittest.main() + + + diff --git a/src/engine/src/tests/run.sh b/src/engine/src/tests/run.sh index 4bb384b3..24da7311 100755 --- a/src/engine/src/tests/run.sh +++ b/src/engine/src/tests/run.sh @@ -1,7 +1,9 @@ cd $(dirname $0) cd ../ # TODO return with non-zero exit code if a test fails -python3 -m unittest -v tests.TestServer +# python3 -m unittest -v tests.TestServer + python3 -m unittest -v tests.TestConditionalExpressionEvaluator python3 -m unittest -v tests.TestIOCContainerFactory -python3 -m unittest -v tests.TestTaskRepository \ No newline at end of file +python3 -m unittest -v tests.TestTaskRepository +python3 -m unittest -v tests.TestSDKUtils \ No newline at end of file