Skip to content

Commit

Permalink
Add secrets engine to workflow engine
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Sep 9, 2024
1 parent ceb147a commit ad79849
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 20 deletions.
16 changes: 11 additions & 5 deletions src/api/specs/WorkflowsAPI.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2542,11 +2542,11 @@ components:
anyOf:
- $ref: '#/components/schemas/DockerhubDestination'
- $ref: '#/components/schemas/LocalDestination'
discriminator:
propertyName: type
mapping:
dockerhub: "#/components/schemas/DockerhubDestination"
local: "#/components/schemas/LocalDestination"
# discriminator:
# propertyName: type
# mapping:
# dockerhub: "#/components/schemas/DockerhubDestination"
# local: "#/components/schemas/LocalDestination"

BaseDestination:
type: object
Expand Down Expand Up @@ -2793,6 +2793,12 @@ components:
type: string

ValueFromSecret:
type: object
properties:
secret:
$ref: "#/components/schemas/SecretRef"

SecretRef:
type: object
properties:
engine:
Expand Down
2 changes: 2 additions & 0 deletions src/engine/src/contrib/tapis/TapisPlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)
from contrib.tapis.middleware.event_handlers.archivers import TapisSystemArchiver
from contrib.tapis.middleware.event_handlers.notifications import TapisWorkflowsAPIBackend
from contrib.tapis.middleware.engines import TapisSKSecretsEngine
from contrib.tapis.executors import TapisActor, TapisJob
from contrib.tapis.constants import TASK_TYPE_TAPIS_ACTOR, TASK_TYPE_TAPIS_JOB, ARCHIVER_TYPE_TAPIS_SYSTEM

Expand Down Expand Up @@ -48,6 +49,7 @@ def __init__(self, name):
)
self.register("task_executor", {TASK_TYPE_TAPIS_ACTOR: TapisActor})
self.register("task_executor", {TASK_TYPE_TAPIS_JOB: TapisJob})
self.register("engine", {"tapis-security-kernal": TapisSKSecretsEngine})
self.register("schema_extension", SchemaExtension(
_type="task_executor",
sub_type="function",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from contrib.tapis.helpers import TapisServiceAPIGateway


class TapisSKSecretsEngine:
def __init__(self):

service_api_gateway = TapisServiceAPIGateway()
self.service_client = service_api_gateway.get_client()

# self._kwargs = {
# "_tapis_set_x_headers_from_service": True,
# "_x_tapis_tenant": ctx.args["tapis_tenant_id"].value,
# "_x_tapis_user": ctx.args["tapis_pipeline_owner"].value,
# "_tapis_headers": {
# "X-WORKFLOW-EXECUTOR-TOKEN": ctx.args["workflow_executor_access_token"].value
# }
# }

def __call__(self, tapis_tenant_id, sk_id):
try:
resp = self.service_client.sk.readSecret(
secretType="user",
secretName=sk_id,
user="workflows",
tenant=tapis_tenant_id,
version=0,
_tapis_set_x_headers_from_service=True
)
# ctx.args["tapis_tenant_id"].value
return resp.secretMap.__dict__
except Exception as e:
return None # TODO catch network error
8 changes: 6 additions & 2 deletions src/engine/src/core/ioc/IOCContainerFactory.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from typing import List

from owe_python_sdk.Plugin import Plugin
from core.ioc import IOCContainer
from core.state import ReactiveState
from core.daos import (
Expand Down Expand Up @@ -33,7 +36,7 @@
)

class IOCContainerFactory:
def build(self):
def build(self, plugins: List[Plugin] = []):
container = IOCContainer()

container.register("ReactiveState",
Expand Down Expand Up @@ -152,7 +155,8 @@ def build(self):
container.load("TaskRepository"),
container.load("TaskOutputRepository"),
container.load("ArgRepository"),
container.load("EnvRepository")
container.load("EnvRepository"),
plugins=plugins if len(plugins) > 0 else []
)
)

Expand Down
21 changes: 17 additions & 4 deletions src/engine/src/core/tasks/TaskInputFileStagingService.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from core.workflows import ValueFromService
from owe_python_sdk.schema import Task
from errors.tasks import TaskInputStagingError
from owe_python_sdk.utils import select_field


class TaskInputFileStagingService:
Expand Down Expand Up @@ -41,24 +42,36 @@ def stage(self, task: Task):
)
except Exception as e:
if input_.required:
raise TaskInputStagingError(f"No output found for task '{value_from[key].task_id}' with output id of '{value_from[key].output_id}'")
raise TaskInputStagingError(f"No output found for task '{value_from[key].task_id}' with output id of '{value_from[key].output_id}' | {str(e)}")
if key == "args":
try:
value = self._value_from_service.get_arg_value_by_key(
value_from[key]
)
except Exception as e:
if input_.required:
raise TaskInputStagingError(f"Error attempting to fetch value from args at key '{value_from[key]}'")
raise TaskInputStagingError(f"Error attempting to fetch value from args at key '{value_from[key]}' | {str(e)}")
if key == "env":
try:
value = self._value_from_service.get_env_value_by_key(
value_from[key]
)
except Exception as e:
if input_.required:
raise TaskInputStagingError(f"Error attempting to fetch value from env at key '{value_from[key]}'")

raise TaskInputStagingError(f"Error attempting to fetch value from env at key '{value_from[key]}' | {str(e)}")
if key == "secret":
try:
value = select_field(
self._value_from_service.get_secret_value_by_engine_and_pk(
value_from[key].engine,
value_from[key].pk
),
value_from[key].field_selector
)
except Exception as e:
if input_.required:
raise TaskInputStagingError(f"Error attempting to fetch value from secret at key '{value_from[key]}' | {str(e)}")

self._create_input_(task, input_id, value)

def _create_input_(self, task, input_id, value):
Expand Down
18 changes: 15 additions & 3 deletions src/engine/src/core/workflows/ValueFromService.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
from typing import List

from owe_python_sdk.Plugin import Plugin
from core.repositories import (
TaskOutputRepository,
TaskRepository,
EnvRepository,
ArgRepository
)


class ValueFromService:
def __init__(
self,
task_repo: TaskRepository,
task_output_repo: TaskOutputRepository,
arg_repo: ArgRepository,
env_repo: EnvRepository
env_repo: EnvRepository,
plugins: List[Plugin] = []
):
self._task_repo = task_repo
self._task_output_repo = task_output_repo
self._arg_repo = arg_repo
self._env_repo = env_repo
self._plugins = plugins

def get_task_output_value_by_id(self, task_id, _id):
task = self._task_repo.get_by_id(task_id)
Expand All @@ -31,4 +35,12 @@ def get_env_value_by_key(self, key):
def get_arg_value_by_key(self, key):
value = self._arg_repo.get_value_by_key(key)
return value


# Finds the first secrets engine from plugins and fetches the secret
def get_secret_value_by_engine_and_pk(self, engine, pk):
plugin = next(filter(
lambda p: engine in p.engines,
self._plugins
))
secrets_engine = plugin.engines[engine]
return secrets_engine(pk)
10 changes: 7 additions & 3 deletions src/engine/src/owe_python_sdk/Plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from owe_python_sdk.middleware.ArchiveMiddleware import ArchiveMiddleware
from owe_python_sdk.SchemaExtension import SchemaExtension

MIDDLEWARE_TYPES = [ "request", "archive", "notification_handler", "task_executor", "schema_extension" ]
MIDDLEWARE_TYPES = [ "request", "archive", "notification_handler", "task_executor", "schema_extension", "engine" ]

class Plugin:
def __init__(self, name):
Expand All @@ -12,6 +12,7 @@ def __init__(self, name):
self.archive_middlewares = []
self.notification_middlewares = []
self.task_executors = {}
self.engines = {}
self.schema_extensions = []

def register(self, _type, middleware):
Expand All @@ -36,6 +37,10 @@ def register(self, _type, middleware):
# TODO more logic to ensure key(s) is a string and val is a
# subclass (not an instance) of TaskExecutor
self.task_executors = {**self.task_executors, **middleware}
elif _type == "engine":
if type(middleware) != dict:
raise Exception(f"Middleware Registration Error: Secrets Engine middleware must be registered as a dict in which the key is a unique identifier and value is the concrete class that inherits from Middleware(not an intance)")
self.engines = {**self.engines, **middleware}
elif _type == "schema_extension":
if type(middleware) != SchemaExtension:
raise Exception(f"Schema Extension Registration Error: Expected type 'SchemaExtension' | Recieved {type(middleware)}")
Expand All @@ -46,9 +51,8 @@ def dispatch(self, _type, ctx):
raise Exception(f"Invalid Middleware type: Recieved '{_type}' | Expected oneOf {MIDDLEWARE_TYPES}")

if _type == "request":
request = ctx
for request_middleware in self.request_middlewares:
request = request_middleware(request)
request = request_middleware(ctx)

# TODO include logic for dispatching event handlers

Expand Down
26 changes: 25 additions & 1 deletion src/engine/src/owe_python_sdk/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from typing import Union, List


def adapt_runner(Runner, RunnerClassExtension: type):
def __init__(self, *args, **kwargs):
Runner.__init__(self)
Expand Down Expand Up @@ -32,4 +35,25 @@ def get_schema_extensions(plugins, _type, sub_type=None):
and (ext.sub_type == sub_type or sub_type == None)
]

return schema_extensions
return schema_extensions

def select_key_or_index(target: Union[dict, list], key_or_index: Union[str, int]):
if type(target) == list:
return target[int(key_or_index)]

if type(target) == dict:
return target[key_or_index]

raise TypeError("Argument `target` must be of type `dict` or `list`")


def select_field(obj: Union[dict, list], selectors: list[str] = []):
if len(selectors) == 0:
return obj

selector = selectors[0]
if len(selectors) == 1:
return select_key_or_index(obj, selector)

selected_obj = select_key_or_index(obj, selector)
return select_field(selected_obj, selectors[1:])
47 changes: 47 additions & 0 deletions src/engine/src/tests/TestSDKUtils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import unittest

from owe_python_sdk.utils import select_field


class TestSDKUtils(unittest.TestCase):

def testSelectField(self):
# Providing no field selectors should just return the object
self.assertTrue(select_field({}, []) == {})
self.assertTrue(select_field(1, []) == 1)

obj = {
"key": [
"arr_item",
{
"shallow_key": "shallow_value",
"deep_key": {
"deeper_key": "deeper_value"
}
}
]
}
self.assertTrue(type(select_field(obj, ["key"])) == list)
self.assertTrue(select_field(obj, ["key", 0]) == "arr_item")
self.assertTrue(select_field(obj, ["key", 1, "shallow_key"]) == "shallow_value")
self.assertTrue(select_field(obj, ["key", 1, "deep_key", "deeper_key"]) == "deeper_value")

# Must throw type error if object provided is not of type dict or list
self.assertRaises(TypeError, lambda: select_field(1, ["test"]))

# Must throw index error if attempting to access a non-existent index
# In the case below, there is no 3rd item in the list
self.assertRaises(IndexError, lambda: select_field(obj, ["key", 2]))

arr = [
"arr_item",
{"key": "value"}
]
self.assertTrue(select_field(arr, [0]) == "arr_item")
self.assertTrue(select_field(arr, [1, "key"]) == "value")

if __name__ == "__main__":
unittest.main()



6 changes: 4 additions & 2 deletions src/engine/src/tests/run.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
cd $(dirname $0)
cd ../
# TODO return with non-zero exit code if a test fails
python3 -m unittest -v tests.TestServer
# python3 -m unittest -v tests.TestServer

python3 -m unittest -v tests.TestConditionalExpressionEvaluator
python3 -m unittest -v tests.TestIOCContainerFactory
python3 -m unittest -v tests.TestTaskRepository
python3 -m unittest -v tests.TestTaskRepository
python3 -m unittest -v tests.TestSDKUtils

0 comments on commit ad79849

Please sign in to comment.