From 68b7d33f55b8f447ab4bff47f2046dcfe86910ff Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Tue, 2 Jul 2024 09:06:07 +0530 Subject: [PATCH] fix Signed-off-by: Abhishek Kumar --- config/systems.json | 4 +- sebs/knative/config.py | 30 ---- sebs/knative/function.py | 93 +----------- sebs/knative/knative.py | 144 +++++++++++++++++-- sebs/knative/trigger.py | 295 --------------------------------------- sebs/knative/triggers.py | 106 ++++++++++++++ 6 files changed, 248 insertions(+), 424 deletions(-) delete mode 100644 sebs/knative/trigger.py create mode 100644 sebs/knative/triggers.py diff --git a/config/systems.json b/config/systems.json index 239d2016..8701150f 100644 --- a/config/systems.json +++ b/config/systems.json @@ -252,7 +252,9 @@ "handler.py", "storage.py" ], - "packages": [] + "packages": { + "parliament-functions": "0.1.0" + } } }, "nodejs": { diff --git a/sebs/knative/config.py b/sebs/knative/config.py index ffa023c0..c9ddbb2c 100644 --- a/sebs/knative/config.py +++ b/sebs/knative/config.py @@ -6,15 +6,6 @@ from typing import cast, Optional -class KnativeCredentials(Credentials): - @staticmethod - def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Credentials: - return KnativeCredentials() - - def serialize(self) -> dict: - return {} - - class KnativeResources(Resources): def __init__( self, @@ -164,24 +155,14 @@ def serialize(self) -> dict: class KnativeConfig(Config): name: str - shutdownStorage: bool cache: Cache def __init__(self, config: dict, cache: Cache): super().__init__(name="knative") - self._credentials = KnativeCredentials() self._resources = KnativeResources() - self.shutdownStorage = config["shutdownStorage"] - self.removeCluster = config["removeCluster"] self.knative_exec = config["knativeExec"] - self.knative_bypass_security = config["knativeBypassSecurity"] - self.experimentalManifest = config["experimentalManifest"] self.cache = cache - @property - def credentials(self) -> KnativeCredentials: - return self._credentials - @property def resources(self) -> KnativeResources: return self._resources @@ -194,12 +175,7 @@ def serialize(self) -> dict: return { "name": self._name, "region": self._region, - "shutdownStorage": self.shutdownStorage, - "removeCluster": self.removeCluster, "knativeExec": self.knative_exec, - "knativeBypassSecurity": self.knative_bypass_security, - "experimentalManifest": self.experimentalManifest, - "credentials": self._credentials.serialize(), "resources": self._resources.serialize(), } @@ -216,11 +192,5 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Config return res def update_cache(self, cache: Cache): - cache.update_config(val=self.shutdownStorage, keys=["knative", "shutdownStorage"]) - cache.update_config(val=self.removeCluster, keys=["knative", "removeCluster"]) cache.update_config(val=self.knative_exec, keys=["knative", "knativeExec"]) - cache.update_config(val=self.knative_bypass_security, keys=["knative", "knativeBypassSecurity"]) - cache.update_config( - val=self.experimentalManifest, keys=["knative", "experimentalManifest"] - ) self.resources.update_cache(cache) diff --git a/sebs/knative/function.py b/sebs/knative/function.py index aa562c6e..404f4da4 100644 --- a/sebs/knative/function.py +++ b/sebs/knative/function.py @@ -6,134 +6,53 @@ from sebs.benchmark import Benchmark from sebs.faas.function import Function, FunctionConfig, Runtime from sebs.storage.config import MinioConfig -from sebs.knative.trigger import LibraryTrigger, HTTPTrigger - @dataclass class KnativeFunctionConfig(FunctionConfig): - """ - Configuration class for Knative function specific configurations. - - Attributes: - docker_image (str): Docker image for the function. - namespace (str): Kubernetes namespace where the function is deployed (default is 'default'). - storage (Optional[MinioConfig]): Optional MinioConfig object for storage configuration. - """ - docker_image: str = "" namespace: str = "default" storage: Optional[MinioConfig] = None + url: str = "" @staticmethod def deserialize(data: dict) -> KnativeFunctionConfig: - """ - Deserialize data from dictionary into KnativeFunctionConfig object. - - Args: - data (dict): Dictionary containing serialized data. - - Returns: - KnativeFunctionConfig: Deserialized KnativeFunctionConfig object. - """ keys = list(KnativeFunctionConfig.__dataclass_fields__.keys()) data = {k: v for k, v in data.items() if k in keys} data["runtime"] = Runtime.deserialize(data["runtime"]) - if "storage" in data: - data["storage"] = MinioConfig.deserialize(data["storage"]) + data["storage"] = MinioConfig.deserialize(data["storage"]) return KnativeFunctionConfig(**data) def serialize(self) -> dict: - """ - Serialize KnativeFunctionConfig object into dictionary. - - Returns: - dict: Dictionary containing serialized data. - """ return self.__dict__ @staticmethod def from_benchmark(benchmark: Benchmark) -> KnativeFunctionConfig: - """ - Create KnativeFunctionConfig object from a benchmark. - - Args: - benchmark (Benchmark): Benchmark object. - - Returns: - KnativeFunctionConfig: Initialized KnativeFunctionConfig object. - """ return super(KnativeFunctionConfig, KnativeFunctionConfig)._from_benchmark( benchmark, KnativeFunctionConfig ) - class KnativeFunction(Function): - """ - Class representing a Knative function. - - Attributes: - name (str): Name of the function. - benchmark (str): Benchmark associated with the function. - code_package_hash (str): Hash of the code package associated with the function. - cfg (KnativeFunctionConfig): Configuration object for the function. - """ - def __init__( self, name: str, benchmark: str, code_package_hash: str, cfg: KnativeFunctionConfig ): - """ - Initialize KnativeFunction object. - - Args: - name (str): Name of the function. - benchmark (str): Benchmark associated with the function. - code_package_hash (str): Hash of the code package associated with the function. - cfg (KnativeFunctionConfig): Configuration object for the function. - """ super().__init__(benchmark, name, code_package_hash, cfg) @property def config(self) -> KnativeFunctionConfig: - """ - Get the configuration object of the function. - - Returns: - KnativeFunctionConfig: Configuration object of the function. - """ return cast(KnativeFunctionConfig, self._cfg) @staticmethod def typename() -> str: - """ - Return the typename of the KnativeFunction class. - - Returns: - str: Typename of the KnativeFunction class. - """ return "Knative.Function" def serialize(self) -> dict: - """ - Serialize KnativeFunction object into dictionary. - - Returns: - dict: Dictionary containing serialized data. - """ - serialized_data = super().serialize() - serialized_data["config"] = self._cfg.serialize() - return serialized_data + return {**super().serialize(), "config": self._cfg.serialize()} @staticmethod def deserialize(cached_config: dict) -> KnativeFunction: - """ - Deserialize dictionary into KnativeFunction object. - - Args: - cached_config (dict): Dictionary containing serialized data. + from sebs.faas.function import Trigger + from sebs.knative.triggers import KnativeLibraryTrigger, KnativeHTTPTrigger - Returns: - KnativeFunction: Deserialized KnativeFunction object. - """ cfg = KnativeFunctionConfig.deserialize(cached_config["config"]) ret = KnativeFunction( cached_config["name"], cached_config["benchmark"], cached_config["hash"], cfg @@ -141,7 +60,7 @@ def deserialize(cached_config: dict) -> KnativeFunction: for trigger in cached_config["triggers"]: trigger_type = cast( Trigger, - {"Library": LibraryTrigger, "HTTP": HTTPTrigger}.get(trigger["type"]), + {"Library": KnativeLibraryTrigger, "HTTP": KnativeHTTPTrigger}.get(trigger["type"]), ) assert trigger_type, "Unknown trigger type {}".format(trigger["type"]) ret.add_trigger(trigger_type.deserialize(trigger)) diff --git a/sebs/knative/knative.py b/sebs/knative/knative.py index 0a9160f3..f3e3e90e 100644 --- a/sebs/knative/knative.py +++ b/sebs/knative/knative.py @@ -1,3 +1,6 @@ +from os import devnull +import subprocess +from flask import config from sebs.faas.system import System from sebs.faas.function import Function, Trigger, ExecutionResult from sebs.faas.storage import PersistentStorage @@ -5,24 +8,49 @@ from sebs.config import SeBSConfig from sebs.cache import Cache from sebs.utils import LoggingHandlers +from sebs.knative.storage import KnativeMinio +from sebs.knative.triggers import KnativeLibraryTrigger, KnativeHTTPTrigger from sebs.faas.config import Resources from typing import Dict, Tuple, Type, List, Optional import docker +from .function import KnativeFunction, KnativeFunctionConfig import uuid +from typing import cast from .config import KnativeConfig class KnativeSystem(System): - def __init__(self, system_config: SeBSConfig, cache_client: Cache, docker_client: docker.client, logger_handlers: LoggingHandlers): - super().__init__(system_config, cache_client, docker_client) - # Initialize any additional Knative-specific attributes here _config: KnativeConfig + def __init__(self, system_config: SeBSConfig, config: KnativeConfig, cache_client: Cache, docker_client: docker.client, logger_handlers: LoggingHandlers): + super().__init__(system_config, cache_client, docker_client) + # Initialize any additional Knative-specific attributes here + self._config = config + self._logger_handlers = logger_handlers + + if self.config.resources.docker_username: + if self.config.resources.docker_registry: + docker_client.login( + username=self.config.resources.docker_username, + password=self.config.resources.docker_password, + registry=self.config.resources.docker_registry, + ) + else: + docker_client.login( + username=self.config.resources.docker_username, + password=self.config.resources.docker_password, + ) + + @property def config(self) -> KnativeConfig: # Return the configuration specific to Knative return self._config - + + def get_knative_func_cmd(self) -> List[str]: + cmd = [self.config.knative_exec] + return cmd + @staticmethod def function_type() -> Type[Function]: # Return the specific function type for Knative @@ -68,9 +96,83 @@ def package_code( return docker_image_name, image_size - def create_function(self, code_package: Benchmark, func_name: str) -> Function: - # Implementation for creating functions - return function + def create_function(self, code_package: Benchmark, func_name: str) -> "KnativeFunction": + self.logging.info("Creating Knative function.") + try: + # Check if the function already exists + knative_func_command = subprocess.run( + [*self.get_knative_func_cmd(), "list"], + stderr=subprocess.DEVNULL, + stdout=subprocess.PIPE, + ) + function_found = False + for line in knative_func_command.stdout.decode().split("\n"): + if line and func_name in line.split()[0]: + function_found = True + break + + if function_found: + self.logging.info(f"Function {func_name} already exists.") + # Logic for updating or handling existing function + # For now, just pass + pass + else: + try: + self.logging.info(f"Creating new Knative function {func_name}") + language = code_package.language_name + + # Create the function + subprocess.run( + ["func", "create", "-l", language, func_name], + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + check=True, + ) + + # Deploy the function + subprocess.run( + ["func", "deploy", "--path", func_name], + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + check=True, + ) + + # Retrieve the function URL + describe_command = [*self.get_knative_func_cmd(), "describe", func_name, "-o", "url"] + result = subprocess.run( + describe_command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True, + ) + function_url = result.stdout.decode().strip() + + # Create the KnativeFunctionConfig + function_cfg = KnativeFunctionConfig.from_benchmark(code_package) + function_cfg.storage = cast(KnativeMinio, self.get_storage()).config + function_cfg.url = function_url + + # Create the function object + res = KnativeFunction( + func_name, code_package.benchmark, code_package.hash, function_cfg + ) + + # Add HTTP trigger with the function URL + trigger = KnativeHTTPTrigger(func_name, function_url) + trigger.logging_handlers = self.logging_handlers + res.add_trigger(trigger) + + return res + + except subprocess.CalledProcessError as e: + self.logging.error(f"Error creating Knative function {func_name}.") + self.logging.error(f"Output: {e.stderr.decode('utf-8')}") + raise RuntimeError(e) + + except FileNotFoundError: + self.logging.error("Could not retrieve Knative functions - is path to func correct?") + raise RuntimeError("Failed to access func binary") + def cached_function(self, function: Function): # Implementation of retrieving cached function details for Knative @@ -100,10 +202,30 @@ def download_metrics(self, function_name: str, start_time: int, end_time: int, r pass def create_trigger(self, function: Function, trigger_type: Trigger.TriggerType) -> Trigger: - # Implementation of trigger creation for Knative - # have to involve in setting up HTTP routes or event sources - trigger = Trigger(name=f"{function.name}-trigger", type=trigger_type) - return trigger + if trigger_type == Trigger.TriggerType.LIBRARY: + return function.triggers(Trigger.TriggerType.LIBRARY)[0] + elif trigger_type == Trigger.TriggerType.HTTP: + try: + response = subprocess.run( + ["func", "describe", function.name, "--output", "url"], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + check=True, + ) + except FileNotFoundError as e: + self.logging.error( + "Could not retrieve Knative function configuration - is the 'func' CLI installed and configured correctly?" + ) + raise RuntimeError(e) + stdout = response.stdout.decode("utf-8") + url = stdout.strip() + trigger = KnativeHTTPTrigger(function.name, url) + trigger.logging_handlers = self.logging_handlers + function.add_trigger(trigger) + self.cache_client.update_function(function) + return trigger + else: + raise RuntimeError("Not supported!") def shutdown(self) -> None: # Clean up any resources or connections diff --git a/sebs/knative/trigger.py b/sebs/knative/trigger.py deleted file mode 100644 index 22c81568..00000000 --- a/sebs/knative/trigger.py +++ /dev/null @@ -1,295 +0,0 @@ -import concurrent.futures -import datetime -import json -import requests -import subprocess -import time -from typing import Dict, List, Optional # noqa - -from sebs.faas.function import ExecutionResult, Trigger - - -class LibraryTrigger(Trigger): - """ - Trigger implementation for invoking a Knative service using port forwarding and curl. - - Attributes: - function_name (str): The name of the function to invoke. - pod_name (str): The name of the Kubernetes pod where the function is deployed. - namespace (str): The Kubernetes namespace where the pod is deployed (default is 'default'). - """ - - def __init__(self, function_name: str, pod_name: str, namespace: str = "default"): - """ - Initialize the LibraryTrigger with the function name, pod name, and namespace. - - Args: - function_name (str): The name of the function to invoke. - pod_name (str): The name of the Kubernetes pod where the function is deployed. - namespace (str, optional): The Kubernetes namespace where the pod is deployed (default is 'default'). - """ - super().__init__() - self.function_name = function_name - self.pod_name = pod_name - self.namespace = namespace - - @staticmethod - def trigger_type() -> "Trigger.TriggerType": - """ - Return the type of trigger (LibraryTrigger). - - Returns: - Trigger.TriggerType: The trigger type (LibraryTrigger). - """ - return Trigger.TriggerType.LIBRARY - - @staticmethod - def get_curl_command(payload: dict) -> List[str]: - """ - Generate a curl command for invoking the function. - - Args: - payload (dict): The payload data to send with the request. - - Returns: - List[str]: The curl command as a list of strings. - """ - return [ - "curl", - "-X", - "POST", - "http://localhost:8080/handle", - "-d", - json.dumps(payload), - "-H", - "Content-Type: application/json", - ] - - def sync_invoke(self, payload: dict) -> ExecutionResult: - """ - Synchronously invoke the function using port forwarding and curl. - - Args: - payload (dict): The payload data to send with the request. - - Returns: - ExecutionResult: The result of the function invocation. - """ - port_forward_cmd = [ - "kubectl", - "port-forward", - f"pod/{self.pod_name}", - "8080:8080", - "-n", - self.namespace, - ] - - # Start port forwarding - port_forward_proc = subprocess.Popen( - port_forward_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - time.sleep(2) # Give some time for port forwarding to start - - command = self.get_curl_command(payload) - error = None - try: - begin = datetime.datetime.now() - response = subprocess.run( - command, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - check=True, - ) - end = datetime.datetime.now() - parsed_response = response.stdout.decode("utf-8") - except (subprocess.CalledProcessError, FileNotFoundError) as e: - end = datetime.datetime.now() - error = e - - # Stop port forwarding - port_forward_proc.terminate() - - knative_result = ExecutionResult.from_times(begin, end) - if error is not None: - self.logging.error(f"Invocation of {self.function_name} failed!") - knative_result.stats.failure = True - return knative_result - - return_content = json.loads(parsed_response) - knative_result.parse_benchmark_output(return_content) - return knative_result - - def async_invoke(self, payload: dict) -> concurrent.futures.Future: - """ - Asynchronously invoke the function using port forwarding and curl. - - Args: - payload (dict): The payload data to send with the request. - - Returns: - concurrent.futures.Future: A future representing the asynchronous invocation. - """ - pool = concurrent.futures.ThreadPoolExecutor() - fut = pool.submit(self.sync_invoke, payload) - return fut - - def serialize(self) -> dict: - """ - Serialize the trigger configuration. - - Returns: - dict: A dictionary representing the serialized trigger configuration. - """ - return { - "type": "Library", - "name": self.function_name, - "pod_name": self.pod_name, - "namespace": self.namespace, - } - - @staticmethod - def deserialize(obj: dict) -> Trigger: - """ - Deserialize a dictionary into a LibraryTrigger object. - - Args: - obj (dict): The dictionary containing the serialized trigger configuration. - - Returns: - Trigger: A LibraryTrigger object instantiated from the serialized data. - """ - return LibraryTrigger(obj["name"], obj["pod_name"], obj["namespace"]) - - @staticmethod - def typename() -> str: - """ - Return the typename of the trigger (Knative.LibraryTrigger). - - Returns: - str: The typename of the trigger. - """ - return "Knative.LibraryTrigger" - - -class HTTPTrigger(Trigger): - """ - Trigger implementation for invoking a Knative service via HTTP. - - Attributes: - function_name (str): The name of the function to invoke. - url (str): The URL of the Knative service endpoint. - """ - - def __init__(self, function_name: str, url: str): - """ - Initialize the HTTPTrigger with the function name and service URL. - - Args: - function_name (str): The name of the function to invoke. - url (str): The URL of the Knative service endpoint. - """ - super().__init__() - self.function_name = function_name - self.url = url - - @staticmethod - def typename() -> str: - """ - Return the typename of the trigger (Knative.HTTPTrigger). - - Returns: - str: The typename of the trigger. - """ - return "Knative.HTTPTrigger" - - @staticmethod - def trigger_type() -> Trigger.TriggerType: - """ - Return the type of trigger (HTTPTrigger). - - Returns: - Trigger.TriggerType: The trigger type (HTTPTrigger). - """ - return Trigger.TriggerType.HTTP - - def sync_invoke(self, payload: dict) -> ExecutionResult: - """ - Synchronously invoke the function via HTTP POST request. - - Args: - payload (dict): The payload data to send with the request. - - Returns: - ExecutionResult: The result of the function invocation. - """ - self.logging.debug(f"Invoke function {self.url}") - return self._http_invoke(payload, self.url, False) - - def async_invoke(self, payload: dict) -> concurrent.futures.Future: - """ - Asynchronously invoke the function via HTTP POST request. - - Args: - payload (dict): The payload data to send with the request. - - Returns: - concurrent.futures.Future: A future representing the asynchronous invocation. - """ - pool = concurrent.futures.ThreadPoolExecutor() - fut = pool.submit(self.sync_invoke, payload) - return fut - - def _http_invoke(self, payload: dict, url: str, async_invoke: bool) -> ExecutionResult: - """ - Helper method for invoking the function via HTTP POST request. - - Args: - payload (dict): The payload data to send with the request. - url (str): The URL of the Knative service endpoint. - async_invoke (bool): Whether the invocation is asynchronous (not used in this method). - - Returns: - ExecutionResult: The result of the function invocation. - """ - headers = {'Content-Type': 'application/json'} - error = None - try: - begin = datetime.datetime.now() - response = requests.post(url, json=payload, headers=headers) - end = datetime.datetime.now() - response.raise_for_status() - parsed_response = response.json() - except (requests.RequestException, ValueError) as e: - end = datetime.datetime.now() - error = e - - knative_result = ExecutionResult.from_times(begin, end) - if error is not None: - self.logging.error(f"HTTP invocation of {self.function_name} failed!") - knative_result.stats.failure = True - return knative_result - - knative_result.parse_benchmark_output(parsed_response) - return knative_result - - def serialize(self) -> dict: - """ - Serialize the trigger configuration. - - Returns: - dict: A dictionary representing the serialized trigger configuration. - """ - return {"type": "HTTP", "fname": self.function_name, "url": self.url} - - @staticmethod - def deserialize(obj: dict) -> Trigger: - """ - Deserialize a dictionary into an HTTPTrigger object. - - Args: - obj (dict): The dictionary containing the serialized trigger configuration. - - Returns: - Trigger: An HTTPTrigger object instantiated from the serialized data. - """ - return HTTPTrigger(obj["fname"], obj["url"]) diff --git a/sebs/knative/triggers.py b/sebs/knative/triggers.py new file mode 100644 index 00000000..b1f01f05 --- /dev/null +++ b/sebs/knative/triggers.py @@ -0,0 +1,106 @@ +import concurrent.futures +import datetime +import json +import subprocess +from typing import Dict, List, Optional + +from sebs.faas.function import ExecutionResult, Trigger + +class KnativeLibraryTrigger(Trigger): + def __init__(self, fname: str, func_cmd: Optional[List[str]] = None): + super().__init__() + self.fname = fname + if func_cmd: + self._func_cmd = [*func_cmd, "invoke", "--target", "remote"] + + @staticmethod + def trigger_type() -> "Trigger.TriggerType": + return Trigger.TriggerType.LIBRARY + + @property + def func_cmd(self) -> List[str]: + assert self._func_cmd + return self._func_cmd + + @func_cmd.setter + def func_cmd(self, func_cmd: List[str]): + self._func_cmd = [*func_cmd, "invoke", "--target", "remote"] + + @staticmethod + def get_command(payload: dict) -> List[str]: + params = ["--data", json.dumps(payload)] + return params + + def sync_invoke(self, payload: dict) -> ExecutionResult: + command = self.func_cmd + self.get_command(payload) + error = None + try: + begin = datetime.datetime.now() + response = subprocess.run( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True, + ) + end = datetime.datetime.now() + parsed_response = response.stdout.decode("utf-8") + except (subprocess.CalledProcessError, FileNotFoundError) as e: + end = datetime.datetime.now() + error = e + + knative_result = ExecutionResult.from_times(begin, end) + if error is not None: + self.logging.error("Invocation of {} failed!".format(self.fname)) + knative_result.stats.failure = True + return knative_result + + return_content = json.loads(parsed_response) + knative_result.parse_benchmark_output(return_content) + return knative_result + + def async_invoke(self, payload: dict) -> concurrent.futures.Future: + pool = concurrent.futures.ThreadPoolExecutor() + fut = pool.submit(self.sync_invoke, payload) + return fut + + def serialize(self) -> dict: + return {"type": "Library", "name": self.fname} + + @staticmethod + def deserialize(obj: dict) -> Trigger: + return KnativeLibraryTrigger(obj["name"]) + + @staticmethod + def typename() -> str: + return "Knative.LibraryTrigger" + + +class KnativeHTTPTrigger(Trigger): + def __init__(self, fname: str, url: str): + super().__init__() + self.fname = fname + self.url = url + + @staticmethod + def typename() -> str: + return "Knative.HTTPTrigger" + + @staticmethod + def trigger_type() -> Trigger.TriggerType: + return Trigger.TriggerType.HTTP + + def sync_invoke(self, payload: dict) -> ExecutionResult: + self.logging.debug(f"Invoke function {self.url}") + return self._http_invoke(payload, self.url, False) + + def async_invoke(self, payload: dict) -> concurrent.futures.Future: + pool = concurrent.futures.ThreadPoolExecutor() + fut = pool.submit(self.sync_invoke, payload) + return fut + + def serialize(self) -> dict: + return {"type": "HTTP", "fname": self.fname, "url": self.url} + + @staticmethod + def deserialize(obj: dict) -> Trigger: + return KnativeHTTPTrigger(obj["fname"], obj["url"])