Skip to content

Commit

Permalink
Major refactor. Introduce IOC container, Repos, data mappers, daos, u…
Browse files Browse the repository at this point in the history
…nit tests
  • Loading branch information
nathandf committed Dec 4, 2023
1 parent d4d82ad commit 012cb7d
Show file tree
Hide file tree
Showing 47 changed files with 788 additions and 113 deletions.
2 changes: 0 additions & 2 deletions src/api/src/backend/services/TaskService.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ def delete(self, tasks: List[Task]):
raise ServerError(message=str(e))

def _recursive_pydantic_model_to_dict(self, obj):
print("OBJ:", obj, flush=True)
if type(obj) in [list, tuple]:
items = type(obj)()
for item in obj:
Expand All @@ -318,7 +317,6 @@ def _recursive_pydantic_model_to_dict(self, obj):
return modified_dict
if isinstance(obj, BaseModel):
dict_obj = obj.dict()
print("IS INSTANCE", dict_obj, flush=True)
modified_dict = {}
for key in dict_obj:
modified_dict[key] = self._recursive_pydantic_model_to_dict(dict_obj[key])
Expand Down
111 changes: 111 additions & 0 deletions src/engine/src/conf/Configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import os

from pathlib import Path

from owe_python_sdk.schema import EnumTaskFlavor


class Configuration:
def __init__(self):
self.BASE_DIR = str(Path(__file__).resolve().parent.parent) + "/"
self.OWE_PYTHON_SDK_DIR = os.path.join(self.BASE_DIR, "owe_python_sdk")

self.IS_LOCAL = True if os.environ.get("IS_LOCAL", 'false') == 'true' else False

# PipelineExecutor configs
self.MAX_CONNECTION_ATTEMPTS = 24
self.CONNECTION_RETRY_DELAY = 5

self.INSUFFICIENT_WORKER_RETRY_DELAY = 10

# STARTING_WORKERS = os.environ.get("MIN_WORKERS", None) or 2
self.STARTING_WORKERS = 100
# MAX_WORKERS = os.environ.get("MAX_WORKERS", None) or 10
self.MAX_WORKERS = 200

# Exchanges
self.INBOUND_EXCHANGE = "workflows"
self.RETRY_EXCHANGE = "retry"
self.DEAD_LETTER_EXCHANGE = "deadletter"
self.DEFERRED_EXCHANGE = "deferred"

# Queues
self.INBOUND_QUEUE = "workflows"
self.RETRY_QUEUE = "retry"
self.DEAD_LETTER_QUEUE = "deadletter"
self.DEFERRED_QUEUE = "deferred"

self.BASE_WORK_DIR = "/var/lib/open-workflow-engine/"

self.LOG_FILE = self.BASE_DIR + "logs/service.log"

self.LOG_LEVEL = os.environ.get("LOG_LEVEL", None)

self.BROKER_USER = os.environ.get("BROKER_USER", None)
self.BROKER_PASSWORD = os.environ.get("BROKER_PASSWORD", None)
self.BROKER_HOST = os.environ.get("BROKER_URL", None)
self.BROKER_PORT = os.environ.get("BROKER_PORT", None)

self.DB_USER = os.environ.get("DB_USER", None)
self.DB_PASSWORD = os.environ.get("DB_PASSWORD", None)
self.DB_HOST = os.environ.get("DB_HOST", None)
self.DB_PORT = os.environ.get("DB_PORT", None)
self.DB_NAME = os.environ.get("DB_NAME", None)

self.LOG_LEVEL = os.environ.get("LOG_LEVEL", None)

self.BROKER_URL = f"amqp://{self.BROKER_USER}:{self.BROKER_PASSWORD}@{self.BROKER_HOST}:{self.BROKER_PORT}"
self.BACKEND_URL = f"db+mysql://{self.DB_USER}:{self.DB_PASSWORD}@{self.DB_HOST}:{self.DB_PORT}/{self.DB_NAME}"

# Read the kubernetes namespace from the serviceaccount namespace directly
try:
self.KUBERNETES_NAMESPACE = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read()
except Exception:
pass
finally:
self.KUBERNETES_NAMESPACE = "default"

self.WORKFLOW_NFS_SERVER = os.environ.get("WORKFLOW_NFS_SERVER")

# Polling intervals in seconds
self.DEFAULT_POLLING_INTERVAL = 1
self.MIN_POLLING_INTERVAL = 1
self.MAX_POLLING_INTERVAL = 3600

# Duplicate submission policy enums
self.DUPLICATE_SUBMISSION_POLICY_TERMINATE = "terminate"
self.DUPLICATE_SUBMISSION_POLICY_ALLOW = "allow"
self.DUPLICATE_SUBMISSION_POLICY_DENY = "deny"
self.DUPLICATE_SUBMISSION_POLICY_DEFER = "defer"

# Execution profile
self.DEFAULT_MAX_EXEC_TIME = 60 * 60 * 24
self.DEFAULT_INVOCATION_MODE = "async"
self.DEFAULT_RETRY_POLICY = "exponential_backoff"
self.DEFAULT_DUPLICATE_SUBMISSION_POLICY = "terminate"
self.DEFAULT_MAX_RETRIES = 0

# Image tags and urls
self.KANIKO_IMAGE_URL = "gcr.io/kaniko-project/executor"
# KANIKO_IMAGE_TAG = "debug"
self.KANIKO_IMAGE_TAG = "latest"

self.FLAVORS = {
# EnumTaskFlavor.C1_XTINY: {"cpu": "1", "memory": ".01G", "disk": "1GB"},
EnumTaskFlavor.C1_TINY: {"cpu": "1", "memory": ".5G", "disk": "20GB"},
EnumTaskFlavor.C1_XXSML: {"cpu": "1", "memory": "1G", "disk": "20GB"},
EnumTaskFlavor.C1_XSML: {"cpu": "1", "memory": "2G", "disk": "20GB"},
EnumTaskFlavor.C1_SML: {"cpu": "1", "memory": "4G", "disk": "20GB"},
EnumTaskFlavor.C1_MED: {"cpu": "2", "memory": "8G", "disk": "20GB"},
EnumTaskFlavor.C1_LRG: {"cpu": "4", "memory": "16G", "disk": "20GB"},
EnumTaskFlavor.C1_XLRG: {"cpu": "8", "memory": "32G", "disk": "20GB"},
EnumTaskFlavor.C1_XXLRG: {"cpu": "16", "memory": "64G", "disk": "20GB"},
EnumTaskFlavor.G1_NVD_SML: {"gpu": "1", "gpu_type": "nvidia.com/gpu", "memory": "4G", "disk": "20GB"},
EnumTaskFlavor.G1_NVD_MED: {"gpu": "2", "gpu_type": "nvidia.com/gpu", "memory": "8G", "disk": "20GB"},
EnumTaskFlavor.G1_NVD_LRG: {"gpu": "4", "gpu_type": "nvidia.com/gpu", "memory": "6G", "disk": "20GB"}
}

self.SINGULARITY_IMAGE_URL = "quay.io/singularity/singularity"
self.SINGULARITY_IMAGE_TAG = "v3.10.0"

self.PLUGINS = [ "contrib/tapis" ]
29 changes: 17 additions & 12 deletions src/engine/src/conf/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,31 @@

LOG_FILE = BASE_DIR + "logs/service.log"

LOG_LEVEL = os.environ["LOG_LEVEL"]
LOG_LEVEL = os.environ.get("LOG_LEVEL", None)

BROKER_USER = os.environ["BROKER_USER"]
BROKER_PASSWORD = os.environ["BROKER_PASSWORD"]
BROKER_HOST = os.environ["BROKER_URL"]
BROKER_PORT = os.environ["BROKER_PORT"]
BROKER_USER = os.environ.get("BROKER_USER", None)
BROKER_PASSWORD = os.environ.get("BROKER_PASSWORD", None)
BROKER_HOST = os.environ.get("BROKER_URL", None)
BROKER_PORT = os.environ.get("BROKER_PORT", None)

DB_USER = os.environ["DB_USER"]
DB_PASSWORD = os.environ["DB_PASSWORD"]
DB_HOST = os.environ["DB_HOST"]
DB_PORT = os.environ["DB_PORT"]
DB_NAME = os.environ["DB_NAME"]
DB_USER = os.environ.get("DB_USER", None)
DB_PASSWORD = os.environ.get("DB_PASSWORD", None)
DB_HOST = os.environ.get("DB_HOST", None)
DB_PORT = os.environ.get("DB_PORT", None)
DB_NAME = os.environ.get("DB_NAME", None)

LOG_LEVEL = os.environ["LOG_LEVEL"]
LOG_LEVEL = os.environ.get("LOG_LEVEL", None)

BROKER_URL = f"amqp://{BROKER_USER}:{BROKER_PASSWORD}@{BROKER_HOST}:{BROKER_PORT}"
BACKEND_URL = f"db+mysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

# Read the kubernetes namespace from the serviceaccount namespace directly
KUBERNETES_NAMESPACE = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read()
try:
KUBERNETES_NAMESPACE = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read()
except Exception:
pass
finally:
KUBERNETES_NAMESPACE = "default"

WORKFLOW_NFS_SERVER = os.environ.get("WORKFLOW_NFS_SERVER")

Expand Down
15 changes: 15 additions & 0 deletions src/engine/src/core/daos/FileSystemDAO.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import os


class FileSystemDAO:
def get(self, path: str):
# Check that the specified output file exists
if not os.path.isfile(path):
raise Exception(f"File {path} not found")

# Grab the value of the output from the file
result = None
with open(path, "r") as file:
result = file.read()

return result
9 changes: 9 additions & 0 deletions src/engine/src/core/daos/WorkflowExecutorStateDAO.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from core.state import ReactiveState


class WorkflowExecutorStateDAO:
def __init__(self, state: ReactiveState):
self._state = state

def get_state(self):
return self._state
2 changes: 2 additions & 0 deletions src/engine/src/core/daos/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from core.daos.FileSystemDAO import FileSystemDAO
from core.daos.WorkflowExecutorStateDAO import WorkflowExecutorStateDAO
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,27 @@
ConditionalExpressions
)

from core.expressions import OperandResolver


class ConditionalExpressionEvaluator:
def __init__(self):
def __init__(
self,
operand_resolver: OperandResolver
):
self._comparison_operators = list(get_args(ComparisonOperator))
self._logical_operators = list(get_args(LogicalOperator))
self._membership_operators = list(get_args(MembershipOperator))
self._operand_resolver = operand_resolver

def evaluate_all(self, conditions: ConditionalExpressions, ctx=None):
def evaluate_all(self, conditions: ConditionalExpressions):
evaluations = []
for condition in conditions:
evaluations.append(self.evaluate(condition, ctx=ctx))
evaluations.append(self.evaluate(condition))

return all(evaluations)

def evaluate(self, condition: ConditionalExpression, ctx=None):
def evaluate(self, condition: ConditionalExpression):
operator = list(condition.keys())[0] # There will only ever be one key in a condition.
operands = condition[operator]
if operator in self._comparison_operators:
Expand All @@ -40,10 +46,19 @@ def evaluate(self, condition: ConditionalExpression, ctx=None):
return self._logical(operator, operands)

def _comparison(self, operator, operands):
return getattr(operatorlib, operator)(operands[0], operands[1])
resolved_operands = [
self._operand_resolver.resolve(operand)
for operand in operands
]
return getattr(operatorlib, operator)(resolved_operands[0], resolved_operands[1])

def _membership(self, _, operands):
return operands[0] in operands[1]
resolved_needle = self._operand_resolver.resolve(operands[0])
resolved_haystack = [
self._operand_resolver.resolve(operand)
for operand in operands[1]
]
return resolved_needle in resolved_haystack

def _logical(self, operator, operands):
evaluations = []
Expand Down
36 changes: 36 additions & 0 deletions src/engine/src/core/expressions/OperandResolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from owe_python_sdk.schema import Operand

from core.workflows import ValueFromService


class OperandResolver:
def __init__(
self,
value_from_service: ValueFromService
):
self._value_from_service = value_from_service

def resolve(self, operand: Operand):
if type(operand) != dict:
return operand

# NOTE all operands should have only 1 key
key = operand.keys[0]
if key == "task_output":
value = self._value_from_service.get_task_output_value_by_id(
task_id=operand[key]["task_id"],
_id=operand[key]["output_id"]
)
return value

if key == "args":
value = self._value_from_service.get_arg_value_by_key(
operand[key]["args"]
)
return value

if key == "env":
value = self._value_from_service.get_env_value_by_key(
operand[key]["args"]
)
return value
2 changes: 2 additions & 0 deletions src/engine/src/core/expressions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from core.expressions.ConditionalExpressionEvaluator import ConditionalExpressionEvaluator
from core.expressions.OperandResolver import OperandResolver
30 changes: 30 additions & 0 deletions src/engine/src/core/ioc/IOCContainer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
class IOCContainer:
def __init__(self):
self._configurations = {}
self._cache = {}

def register(self, key, handler: callable, as_singleton=False):
self._configurations[key] = {
"handler": handler,
"as_singleton": as_singleton
}

# NOTE *args and **kwargs not really implemented in handlers, but good
# to leave it for extensibility. Perhaps in the future, we may want the object
# loading to be configurable.
def load(self, key):
if key in self._cache:
return self._cache[key]

configuration = self._configurations.get(key, None)
if configuration == None:
raise Exception(f"No object registered with key {key}")

obj = configuration["handler"]()
if configuration["as_singleton"]:
self._cache[key] = obj

return obj



Loading

0 comments on commit 012cb7d

Please sign in to comment.