From 775e83b1fe13f5f9efcccf3c64249d96b1a28ea2 Mon Sep 17 00:00:00 2001 From: orosca Date: Sat, 27 Jul 2024 22:59:01 -0400 Subject: [PATCH 1/7] Queue proposal --- sebs/aws/queue.py | 68 +++++++++++++++++++++++++++++++++++++++++++++ sebs/azure/queue.py | 63 +++++++++++++++++++++++++++++++++++++++++ sebs/faas/queue.py | 65 +++++++++++++++++++++++++++++++++++++++++++ sebs/gcp/queue.py | 55 ++++++++++++++++++++++++++++++++++++ 4 files changed, 251 insertions(+) create mode 100644 sebs/aws/queue.py create mode 100644 sebs/azure/queue.py create mode 100644 sebs/faas/queue.py create mode 100644 sebs/gcp/queue.py diff --git a/sebs/aws/queue.py b/sebs/aws/queue.py new file mode 100644 index 00000000..ae247607 --- /dev/null +++ b/sebs/aws/queue.py @@ -0,0 +1,68 @@ +from sebs.cache import Cache +from sebs.faas.config import Resources +from sebs.faas.queue import Queue + +import boto3 + + +class SQS(Queue): + @staticmethod + def typename() -> str: + return "AWS.SQS" + + @staticmethod + def deployment_name(): + return "aws" + + @property + def queue_url(self): + return self._queue_url + + def __init__( + self, + benchmark: str, + queue_type: Queue.QueueType, + session: boto3.session.Session, + cache_client: Cache, + resources: Resources, + region: str + ): + super().__init__(benchmark, queue_type, region, cache_client, resources) + self.client = session.client( + "sqs", + region_name=region + ) + + def create_queue(self) -> str: + # Create queue + self.logging.debug(f"Creating queue {self.name}") + + self._queue_url = self.client.create_queue(QueueName=self.name)["QueueUrl"] + queue_arn = self.client.get_queue_attributes( + QueueUrl=self.queue_url, + AttributeNames=["QueueArn"] + )["Attributes"]["QueueArn"] + + self.logging.debug("Created queue") + + if (self.queue_type == Queue.QueueType.TRIGGER): + # Add queue trigger + if (not len(self.client.list_event_source_mappings(EventSourceArn=queue_arn, + FunctionName=self.name) + ["EventSourceMappings"])): + self.client.create_event_source_mapping( + EventSourceArn=queue_arn, + FunctionName=self.name, + MaximumBatchingWindowInSeconds=1 + ) + + def remove_queue(self): + raise NotImplementedError() + + def send_message(self, serialized_message: str): + self.client.send_message( + QueueUrl=self.queue_url, MessageBody=serialized_message) + self.logging.info(f"Sent message to queue {self.name}") + + def receive_message(self) -> str: + raise NotImplementedError() \ No newline at end of file diff --git a/sebs/azure/queue.py b/sebs/azure/queue.py new file mode 100644 index 00000000..219890be --- /dev/null +++ b/sebs/azure/queue.py @@ -0,0 +1,63 @@ +from sebs.cache import Cache +from sebs.faas.config import Resources +from sebs.faas.queue import Queue + +from azure.core.exceptions import ResourceExistsError +from azure.identity import DefaultAzureCredential +from azure.storage.blob import BlobServiceClient +from azure.storage.queue import QueueClient + + +class AzureQueue(Queue): + @staticmethod + def typename() -> str: + return "Azure.Queue" + + @staticmethod + def deployment_name(): + return "azure" + + @property + def storage_account(self) -> str: + assert self._storage_account + return self._storage_account + + @property + def account_url(self) -> str: + return f"https://{self.storage_account}.queue.core.windows.net" + + def __init__( + self, + benchmark: str, + queue_type: Queue.QueueType, + cache_client: Cache, + resources: Resources, + region: str, + storage_account: str, + ): + default_credential = DefaultAzureCredential() + + super().__init__(benchmark, queue_type, region, cache_client, resources) + self._storage_account = storage_account + self.client = QueueClient(self.account_url, + queue_name=self.name, + credential=default_credential) + + def create_queue(self): + self.logging.info(f"Creating queue {self.name}") + + try: + self.client.create_queue() + self.logging.info("Created queue") + except ResourceExistsError: + self.logging.info("Queue already exists, reusing...") + + def remove_queue(self): + raise NotImplementedError() + + def send_message(self, serialized_message: str): + self.client.send_message(serialized_message) + self.logging.info(f"Sent message to queue {self.queue_name}") + + def receive_message(self) -> str: + raise NotImplementedError() \ No newline at end of file diff --git a/sebs/faas/queue.py b/sebs/faas/queue.py new file mode 100644 index 00000000..00a098f0 --- /dev/null +++ b/sebs/faas/queue.py @@ -0,0 +1,65 @@ +from abc import ABC +from abc import abstractmethod +from enum import Enum + +from sebs.faas.config import Resources +from sebs.cache import Cache +from sebs.utils import LoggingBase + +class Queue(ABC, LoggingBase): + class QueueType(str, Enum): + TRIGGER = "trigger" + RESULT = "result" + + @staticmethod + def deserialize(val: str) -> Queue.QueueType: + for member in Queue.QueueType: + if member.value == val: + return member + raise Exception(f"Unknown queue type {val}") + + @staticmethod + @abstractmethod + def deployment_name() -> str: + pass + + @property + def cache_client(self) -> Cache: + return self._cache_client + + @property + def region(self): + return self._region + + @property + def queue_type(self): + return self._queue_type + + @property + def name(self): + return self._name + + def __init__(self, benchmark: str, queue_type: Queue.QueueType, region: str, cache_client: Cache, resources: Resources): + super().__init__() + self._name = "{}-{}".format(benchmark, queue_type) + self._queue_type = queue_type + self._cache_client = cache_client + self._cached = False + self._region = region + self._cloud_resources = resources + + @abstractmethod + def create_queue(self): + pass + + @abstractmethod + def remove_queue(self): + pass + + @abstractmethod + def send_message(self, serialized_message: str): + pass + + @abstractmethod + def receive_message(self) -> str: + pass \ No newline at end of file diff --git a/sebs/gcp/queue.py b/sebs/gcp/queue.py new file mode 100644 index 00000000..654fe4ab --- /dev/null +++ b/sebs/gcp/queue.py @@ -0,0 +1,55 @@ +from googleapiclient.discovery import build + +from sebs.cache import Cache +from sebs.faas.config import Resources +from sebs.faas.queue import Queue + +from google.cloud import storage as gcp_storage +from googleapiclient.errors import HttpError + + +class GCPQueue(Queue): + @staticmethod + def typename() -> str: + return "GCP.Queue" + + @staticmethod + def deployment_name(): + return "gcp" + + def __init__( + self, + benchmark: str, + queue_type: Queue.QueueType, + cache_client: Cache, + resources: Resources, + region: str + ): + super().__init__(benchmark, queue_type, region, cache_client, resources) + self.client = build("pubsub", "v1", cache_discovery=False) + + def create_queue(self): + self.logging.info(f"Creating queue '{self.name}'") + + try: + self.client.projects().topics().create(name=self.name).execute() + self.logging.info("Created queue") + except HttpError as http_error: + if http_error.resp.status == 409: + self.logging.info("Queue already exists, reusing...") + + def remove_queue(self): + raise NotImplementedError() + + def send_message(self, serialized_message: str): + self.client.projects().topics().publish( + topic=self.name, + body={ + "messages": [{ + "data": serialized_message.decode("utf-8") + }], + } + ).execute() + + def receive_message(self) -> str: + raise NotImplementedError() \ No newline at end of file From 557dea349e987aadd3d6f8fb24718e70b363937b Mon Sep 17 00:00:00 2001 From: orosca Date: Fri, 30 Aug 2024 01:59:17 +0300 Subject: [PATCH 2/7] Queue classes --- requirements.gcp.txt | 1 + sebs/aws/queue.py | 37 +++++++++++++---- sebs/azure/queue.py | 23 +++++++++-- sebs/faas/queue.py | 17 +++----- sebs/gcp/queue.py | 98 ++++++++++++++++++++++++++++++++++---------- 5 files changed, 132 insertions(+), 44 deletions(-) diff --git a/requirements.gcp.txt b/requirements.gcp.txt index 9cb90916..d52de146 100644 --- a/requirements.gcp.txt +++ b/requirements.gcp.txt @@ -4,3 +4,4 @@ google-api-python-client==1.12.5 google-cloud-monitoring==2.0.0 google-api-python-client-stubs google-cloud-logging==2.0.0 +google-cloud-pubsub=2.23.0 diff --git a/sebs/aws/queue.py b/sebs/aws/queue.py index ae247607..050974e3 100644 --- a/sebs/aws/queue.py +++ b/sebs/aws/queue.py @@ -30,39 +30,60 @@ def __init__( super().__init__(benchmark, queue_type, region, cache_client, resources) self.client = session.client( "sqs", - region_name=region + region_name=region, ) def create_queue(self) -> str: - # Create queue self.logging.debug(f"Creating queue {self.name}") self._queue_url = self.client.create_queue(QueueName=self.name)["QueueUrl"] queue_arn = self.client.get_queue_attributes( QueueUrl=self.queue_url, - AttributeNames=["QueueArn"] + AttributeNames=["QueueArn"], )["Attributes"]["QueueArn"] self.logging.debug("Created queue") if (self.queue_type == Queue.QueueType.TRIGGER): - # Add queue trigger + # Make it an actual trigger for the function. GCP and Azure use + # different mechanisms so this is skipped for them. if (not len(self.client.list_event_source_mappings(EventSourceArn=queue_arn, FunctionName=self.name) ["EventSourceMappings"])): self.client.create_event_source_mapping( EventSourceArn=queue_arn, FunctionName=self.name, - MaximumBatchingWindowInSeconds=1 + MaximumBatchingWindowInSeconds=1, ) def remove_queue(self): - raise NotImplementedError() + self.logging.info(f"Deleting queue {self.name}") + + self.client.delete_queue(QueueUrl=self.queue_url) + + self.logging.info("Deleted queue") def send_message(self, serialized_message: str): self.client.send_message( - QueueUrl=self.queue_url, MessageBody=serialized_message) + QueueUrl=self.queue_url, + MessageBody=serialized_message, + ) self.logging.info(f"Sent message to queue {self.name}") def receive_message(self) -> str: - raise NotImplementedError() \ No newline at end of file + self.logging.info(f"Pulling a message from {self.name}") + + response = self.client.receive_message( + QueueUrl=self.queue_url, + MessageSystemAttributeNames=["SentTimestamp"], + MaxNumberOfMessages=1, + MessageAttributeNames=["All"], + WaitTimeSeconds=5, + ) + + if ("Messages" not in response): + self.logging.info("No messages to be received") + return + + self.logging.info(f"Received a message from {self.name}") + return response["Messages"][0]["Body"] \ No newline at end of file diff --git a/sebs/azure/queue.py b/sebs/azure/queue.py index 219890be..1e849348 100644 --- a/sebs/azure/queue.py +++ b/sebs/azure/queue.py @@ -1,6 +1,6 @@ from sebs.cache import Cache from sebs.faas.config import Resources -from sebs.faas.queue import Queue +from sebs.faas.queue import Queue, QueueType from azure.core.exceptions import ResourceExistsError from azure.identity import DefaultAzureCredential @@ -29,7 +29,7 @@ def account_url(self) -> str: def __init__( self, benchmark: str, - queue_type: Queue.QueueType, + queue_type: QueueType, cache_client: Cache, resources: Resources, region: str, @@ -53,11 +53,26 @@ def create_queue(self): self.logging.info("Queue already exists, reusing...") def remove_queue(self): - raise NotImplementedError() + self.logging.info(f"Deleting queue {self.name}") + + self.client.delete_queue() + + self.logging.info("Deleted queue") def send_message(self, serialized_message: str): self.client.send_message(serialized_message) self.logging.info(f"Sent message to queue {self.queue_name}") def receive_message(self) -> str: - raise NotImplementedError() \ No newline at end of file + self.logging.info(f"Pulling a message from {self.name}") + + response = self.client.receive_messages( + max_messages=1, + timeout=5, + ) + + if (len(response) == 0): + self.logging.info("No messages to be received") + return + + return response[0].content \ No newline at end of file diff --git a/sebs/faas/queue.py b/sebs/faas/queue.py index 00a098f0..9e7ea938 100644 --- a/sebs/faas/queue.py +++ b/sebs/faas/queue.py @@ -6,17 +6,12 @@ from sebs.cache import Cache from sebs.utils import LoggingBase -class Queue(ABC, LoggingBase): - class QueueType(str, Enum): - TRIGGER = "trigger" - RESULT = "result" +class QueueType(str, Enum): + TRIGGER = "trigger" + RESULT = "result" + - @staticmethod - def deserialize(val: str) -> Queue.QueueType: - for member in Queue.QueueType: - if member.value == val: - return member - raise Exception(f"Unknown queue type {val}") +class Queue(ABC, LoggingBase): @staticmethod @abstractmethod @@ -39,7 +34,7 @@ def queue_type(self): def name(self): return self._name - def __init__(self, benchmark: str, queue_type: Queue.QueueType, region: str, cache_client: Cache, resources: Resources): + def __init__(self, benchmark: str, queue_type: QueueType, region: str, cache_client: Cache, resources: Resources): super().__init__() self._name = "{}-{}".format(benchmark, queue_type) self._queue_type = queue_type diff --git a/sebs/gcp/queue.py b/sebs/gcp/queue.py index 654fe4ab..fbd5c905 100644 --- a/sebs/gcp/queue.py +++ b/sebs/gcp/queue.py @@ -2,10 +2,13 @@ from sebs.cache import Cache from sebs.faas.config import Resources -from sebs.faas.queue import Queue +from sebs.faas.queue import Queue, QueueType -from google.cloud import storage as gcp_storage -from googleapiclient.errors import HttpError +from google.api_core import retry +from google.api_core.exceptions import AlreadyExists +from google.cloud import pubsub_v1 + +import os class GCPQueue(Queue): @@ -17,39 +20,92 @@ def typename() -> str: def deployment_name(): return "gcp" + @property + def topic_name(self): + return self._topic_name + + @property + def subscription_name(self): + return self._subscription_name + + @property + def subscription_client(self): + return self._subscription_client + def __init__( self, benchmark: str, - queue_type: Queue.QueueType, + queue_type: QueueType, cache_client: Cache, resources: Resources, region: str ): super().__init__(benchmark, queue_type, region, cache_client, resources) - self.client = build("pubsub", "v1", cache_discovery=False) + self.client = pubsub_v1.PublisherClient() + self._subscription_client = pubsub_v1.SubscriberClient() - def create_queue(self): - self.logging.info(f"Creating queue '{self.name}'") + self._topic_name = 'projects/{project_id}/topics/{topic}'.format( + project_id=os.getenv('GOOGLE_CLOUD_PROJECT'), + topic=self.name, + ) + self._subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format( + project_id=os.getenv('GOOGLE_CLOUD_PROJECT'), + sub=self.name, + ) + def create_queue(self): + self.logging.info(f"Creating queue {self.name}") try: - self.client.projects().topics().create(name=self.name).execute() + self.client.create_topic(name=self.topic_name) self.logging.info("Created queue") - except HttpError as http_error: - if http_error.resp.status == 409: - self.logging.info("Queue already exists, reusing...") + except AlreadyExists: + self.logging.info("Queue already exists, reusing...") + + # GCP additionally needs a 'subscription' resource which is the + # actual receiver of the messages. It is constructed and destructed + # alongside the topic at all times. + self.logging.info(f"Creating queue subscription") + try: + self.subscription_client.create_subscription( + name=self.subscription_name, + topic=self.topic_name + ) + self.logging.info("Created queue subscription") + except AlreadyExists: + self.logging.info("Subscription already exists, reusing...") def remove_queue(self): - raise NotImplementedError() + self.logging.info(f"Deleting queue and associated subscription{self.name}") + + self.client.delete_topic(topic=self.topic_name) + self.subscription_client.delete_subscription(subscription=self.subscription_name) + + self.logging.info("Deleted queue and associated subscription") def send_message(self, serialized_message: str): - self.client.projects().topics().publish( - topic=self.name, - body={ - "messages": [{ - "data": serialized_message.decode("utf-8") - }], - } - ).execute() + self.client.publish(self.topic_name, serialized_message.decode("utf-8")) + self.logging.info(f"Sent message to queue {self.name}") + # Receive messages through the 'pull' (sync) method. def receive_message(self) -> str: - raise NotImplementedError() \ No newline at end of file + self.logging.info(f"Pulling a message from {self.name}") + + response = self.subscription_client.pull( + subscription=self.subscription_name, + max_messages=1, + retry=retry.Retry(deadline=5), + ) + + if (len(response.received_messages) == 0): + self.logging.info("No messages to be received") + return + + # Acknowledge the received message so it is not sent again. + received_message = response.received_messages[0] + self.subscription_client.acknowledge( + subscription=self.subscription_name, + ack_ids=[received_message.ack_id], + ) + self.logging.info(f"Received a message from {self.name}") + + return received_message.message.data \ No newline at end of file From 452c26ca4c99698708e9f5c841889f4eac2bff1b Mon Sep 17 00:00:00 2001 From: orosca Date: Mon, 9 Sep 2024 00:32:02 +0200 Subject: [PATCH 3/7] Tweaks --- sebs/aws/queue.py | 85 +++++++++++++++++++++++++++++++-------------- sebs/azure/queue.py | 42 +++++++++++++++++----- sebs/faas/queue.py | 33 +++++++++++++----- sebs/gcp/queue.py | 32 ++++++++++++++--- 4 files changed, 143 insertions(+), 49 deletions(-) diff --git a/sebs/aws/queue.py b/sebs/aws/queue.py index 050974e3..79a28fd1 100644 --- a/sebs/aws/queue.py +++ b/sebs/aws/queue.py @@ -1,6 +1,8 @@ +from typing import Optional, cast +from sebs.aws.aws import AWS from sebs.cache import Cache from sebs.faas.config import Resources -from sebs.faas.queue import Queue +from sebs.faas.queue import Queue, QueueType import boto3 @@ -18,44 +20,48 @@ def deployment_name(): def queue_url(self): return self._queue_url + @property + def queue_arn(self): + return self._queue_arn + def __init__( self, benchmark: str, - queue_type: Queue.QueueType, - session: boto3.session.Session, - cache_client: Cache, - resources: Resources, - region: str + queue_type: QueueType, + region: str, + queue_url: Optional[str] = None, + queue_arn: Optional[str] = None ): - super().__init__(benchmark, queue_type, region, cache_client, resources) - self.client = session.client( - "sqs", - region_name=region, + super().__init__( + benchmark, + queue_type, + region + # deployment_client.cache_client, + # deployment_client.config.resources + ) + self._queue_url = queue_url + self._queue_arn = queue_arn + + self.client = boto3.session.Session().client( + 'sqs', + region_name=self.region ) def create_queue(self) -> str: self.logging.debug(f"Creating queue {self.name}") + if (self._queue_url and self._queue_arn): + self.logging.debug("Queue already exists, reusing...") + return + self._queue_url = self.client.create_queue(QueueName=self.name)["QueueUrl"] - queue_arn = self.client.get_queue_attributes( + self._queue_arn = self.client.get_queue_attributes( QueueUrl=self.queue_url, AttributeNames=["QueueArn"], )["Attributes"]["QueueArn"] self.logging.debug("Created queue") - if (self.queue_type == Queue.QueueType.TRIGGER): - # Make it an actual trigger for the function. GCP and Azure use - # different mechanisms so this is skipped for them. - if (not len(self.client.list_event_source_mappings(EventSourceArn=queue_arn, - FunctionName=self.name) - ["EventSourceMappings"])): - self.client.create_event_source_mapping( - EventSourceArn=queue_arn, - FunctionName=self.name, - MaximumBatchingWindowInSeconds=1, - ) - def remove_queue(self): self.logging.info(f"Deleting queue {self.name}") @@ -75,7 +81,7 @@ def receive_message(self) -> str: response = self.client.receive_message( QueueUrl=self.queue_url, - MessageSystemAttributeNames=["SentTimestamp"], + AttributeNames=["SentTimestamp"], MaxNumberOfMessages=1, MessageAttributeNames=["All"], WaitTimeSeconds=5, @@ -83,7 +89,34 @@ def receive_message(self) -> str: if ("Messages" not in response): self.logging.info("No messages to be received") - return + return "" self.logging.info(f"Received a message from {self.name}") - return response["Messages"][0]["Body"] \ No newline at end of file + + # Delete the message from the queue - serves as an acknowledgement + # that it was received. + self.client.delete_message( + QueueUrl=self.queue_url, + ReceiptHandle=response["Messages"][0]["ReceiptHandle"], + ) + + return response["Messages"][0]["Body"] + + def serialize(self) -> dict: + return { + "name": self.name, + "type": self.queue_type, + "region": self.region, + "queue_url": self.queue_url, + "queue_arn": self.queue_arn, + } + + @staticmethod + def deserialize(obj: dict) -> "SQS": + return SQS( + obj["name"], + obj["type"], + obj["region"], + obj["queue_url"], + obj["queue_arn"] + ) diff --git a/sebs/azure/queue.py b/sebs/azure/queue.py index 1e849348..fe1856c3 100644 --- a/sebs/azure/queue.py +++ b/sebs/azure/queue.py @@ -30,14 +30,20 @@ def __init__( self, benchmark: str, queue_type: QueueType, - cache_client: Cache, - resources: Resources, - region: str, + # cache_client: Cache, + # resources: Resources, storage_account: str, + region: str ): default_credential = DefaultAzureCredential() - super().__init__(benchmark, queue_type, region, cache_client, resources) + super().__init__( + benchmark, + queue_type, + region + # cache_client, + # resources + ) self._storage_account = storage_account self.client = QueueClient(self.account_url, queue_name=self.name, @@ -61,7 +67,7 @@ def remove_queue(self): def send_message(self, serialized_message: str): self.client.send_message(serialized_message) - self.logging.info(f"Sent message to queue {self.queue_name}") + self.logging.info(f"Sent message to queue {self.name}") def receive_message(self) -> str: self.logging.info(f"Pulling a message from {self.name}") @@ -71,8 +77,26 @@ def receive_message(self) -> str: timeout=5, ) - if (len(response) == 0): - self.logging.info("No messages to be received") - return + for msg in response: + self.client.delete_message(msg) + return msg.content + + self.logging.info("No messages to be received") + return "" + + def serialize(self) -> dict: + return { + "name": self.name, + "type": self.queue_type, + "storage_account": self.storage_account, + "region": self.region + } - return response[0].content \ No newline at end of file + @staticmethod + def deserialize(obj: dict) -> "AzureQueue": + return AzureQueue( + obj["name"], + obj["type"], + obj["storage_account"], + obj["region"] + ) diff --git a/sebs/faas/queue.py b/sebs/faas/queue.py index 9e7ea938..e38565cc 100644 --- a/sebs/faas/queue.py +++ b/sebs/faas/queue.py @@ -1,8 +1,9 @@ from abc import ABC from abc import abstractmethod from enum import Enum +from typing import Optional -from sebs.faas.config import Resources +# from sebs.faas.config import Resources from sebs.cache import Cache from sebs.utils import LoggingBase @@ -18,9 +19,9 @@ class Queue(ABC, LoggingBase): def deployment_name() -> str: pass - @property - def cache_client(self) -> Cache: - return self._cache_client + # @property + # def cache_client(self) -> Cache: + # return self._cache_client @property def region(self): @@ -34,14 +35,28 @@ def queue_type(self): def name(self): return self._name - def __init__(self, benchmark: str, queue_type: QueueType, region: str, cache_client: Cache, resources: Resources): + def __init__( + self, + benchmark: str, + queue_type: QueueType, + region: str + # cache_client: Optional[Cache], + # resources: Optional[Resources] + ): super().__init__() - self._name = "{}-{}".format(benchmark, queue_type) + self._name = None + if (queue_type == QueueType.RESULT): + self._name = "{}-{}".format(benchmark, queue_type) + else: + self._name = benchmark + # TODO(oana) maybe think of a better way + if (benchmark.endswith("-result") or benchmark.endswith("-trigger")): + self._name = benchmark self._queue_type = queue_type - self._cache_client = cache_client + # self._cache_client = cache_client self._cached = False self._region = region - self._cloud_resources = resources + # self._cloud_resources = resources @abstractmethod def create_queue(self): @@ -57,4 +72,4 @@ def send_message(self, serialized_message: str): @abstractmethod def receive_message(self) -> str: - pass \ No newline at end of file + pass diff --git a/sebs/gcp/queue.py b/sebs/gcp/queue.py index fbd5c905..41a9ea27 100644 --- a/sebs/gcp/queue.py +++ b/sebs/gcp/queue.py @@ -1,3 +1,4 @@ +from typing import Optional from googleapiclient.discovery import build from sebs.cache import Cache @@ -36,11 +37,17 @@ def __init__( self, benchmark: str, queue_type: QueueType, - cache_client: Cache, - resources: Resources, + # cache_client: Cache, + # resources: Resources, region: str ): - super().__init__(benchmark, queue_type, region, cache_client, resources) + super().__init__( + benchmark, + queue_type, + region + # cache_client, + # resources + ) self.client = pubsub_v1.PublisherClient() self._subscription_client = pubsub_v1.SubscriberClient() @@ -98,7 +105,7 @@ def receive_message(self) -> str: if (len(response.received_messages) == 0): self.logging.info("No messages to be received") - return + return "" # Acknowledge the received message so it is not sent again. received_message = response.received_messages[0] @@ -108,4 +115,19 @@ def receive_message(self) -> str: ) self.logging.info(f"Received a message from {self.name}") - return received_message.message.data \ No newline at end of file + return received_message.message.data + + def serialize(self) -> dict: + return { + "name": self.name, + "type": self.queue_type, + "region": self.region, + } + + @staticmethod + def deserialize(obj: dict) -> "GCPQueue": + return GCPQueue( + obj["name"], + obj["type"], + obj["region"], + ) From e3999f8f1838f7b74299293932414743211e93f8 Mon Sep 17 00:00:00 2001 From: orosca Date: Mon, 9 Sep 2024 09:40:10 +0200 Subject: [PATCH 4/7] Add caching, small tweaks --- sebs/aws/queue.py | 2 -- sebs/azure/queue.py | 4 ---- sebs/faas/queue.py | 25 ++++++------------------- sebs/gcp/queue.py | 4 ---- 4 files changed, 6 insertions(+), 29 deletions(-) diff --git a/sebs/aws/queue.py b/sebs/aws/queue.py index 79a28fd1..2448445e 100644 --- a/sebs/aws/queue.py +++ b/sebs/aws/queue.py @@ -36,8 +36,6 @@ def __init__( benchmark, queue_type, region - # deployment_client.cache_client, - # deployment_client.config.resources ) self._queue_url = queue_url self._queue_arn = queue_arn diff --git a/sebs/azure/queue.py b/sebs/azure/queue.py index fe1856c3..8299f4d0 100644 --- a/sebs/azure/queue.py +++ b/sebs/azure/queue.py @@ -30,8 +30,6 @@ def __init__( self, benchmark: str, queue_type: QueueType, - # cache_client: Cache, - # resources: Resources, storage_account: str, region: str ): @@ -41,8 +39,6 @@ def __init__( benchmark, queue_type, region - # cache_client, - # resources ) self._storage_account = storage_account self.client = QueueClient(self.account_url, diff --git a/sebs/faas/queue.py b/sebs/faas/queue.py index e38565cc..b0b5b2ca 100644 --- a/sebs/faas/queue.py +++ b/sebs/faas/queue.py @@ -1,10 +1,7 @@ from abc import ABC from abc import abstractmethod from enum import Enum -from typing import Optional -# from sebs.faas.config import Resources -from sebs.cache import Cache from sebs.utils import LoggingBase class QueueType(str, Enum): @@ -19,10 +16,6 @@ class Queue(ABC, LoggingBase): def deployment_name() -> str: pass - # @property - # def cache_client(self) -> Cache: - # return self._cache_client - @property def region(self): return self._region @@ -40,23 +33,17 @@ def __init__( benchmark: str, queue_type: QueueType, region: str - # cache_client: Optional[Cache], - # resources: Optional[Resources] ): super().__init__() - self._name = None - if (queue_type == QueueType.RESULT): + self._name = benchmark + + # Convention: the trigger queue carries the name of the function. The + # result queue carries the name of the function + "-result". + if (queue_type == QueueType.RESULT and not benchmark.endswith("-result")): self._name = "{}-{}".format(benchmark, queue_type) - else: - self._name = benchmark - # TODO(oana) maybe think of a better way - if (benchmark.endswith("-result") or benchmark.endswith("-trigger")): - self._name = benchmark + self._queue_type = queue_type - # self._cache_client = cache_client - self._cached = False self._region = region - # self._cloud_resources = resources @abstractmethod def create_queue(self): diff --git a/sebs/gcp/queue.py b/sebs/gcp/queue.py index 41a9ea27..1d03e12f 100644 --- a/sebs/gcp/queue.py +++ b/sebs/gcp/queue.py @@ -37,16 +37,12 @@ def __init__( self, benchmark: str, queue_type: QueueType, - # cache_client: Cache, - # resources: Resources, region: str ): super().__init__( benchmark, queue_type, region - # cache_client, - # resources ) self.client = pubsub_v1.PublisherClient() self._subscription_client = pubsub_v1.SubscriberClient() From e7e29feb51c94d2e4a931b1f31af9c885859350f Mon Sep 17 00:00:00 2001 From: orosca Date: Mon, 9 Sep 2024 09:47:22 +0200 Subject: [PATCH 5/7] Remove unused imports --- requirements.gcp.txt | 2 +- sebs/aws/queue.py | 5 +---- sebs/azure/queue.py | 3 --- sebs/gcp/queue.py | 5 ----- 4 files changed, 2 insertions(+), 13 deletions(-) diff --git a/requirements.gcp.txt b/requirements.gcp.txt index d52de146..16dd0ed3 100644 --- a/requirements.gcp.txt +++ b/requirements.gcp.txt @@ -4,4 +4,4 @@ google-api-python-client==1.12.5 google-cloud-monitoring==2.0.0 google-api-python-client-stubs google-cloud-logging==2.0.0 -google-cloud-pubsub=2.23.0 +google-cloud-pubsub==2.23.0 diff --git a/sebs/aws/queue.py b/sebs/aws/queue.py index 2448445e..ebc32345 100644 --- a/sebs/aws/queue.py +++ b/sebs/aws/queue.py @@ -1,7 +1,4 @@ -from typing import Optional, cast -from sebs.aws.aws import AWS -from sebs.cache import Cache -from sebs.faas.config import Resources +from typing import Optional from sebs.faas.queue import Queue, QueueType import boto3 diff --git a/sebs/azure/queue.py b/sebs/azure/queue.py index 8299f4d0..e0f67ec9 100644 --- a/sebs/azure/queue.py +++ b/sebs/azure/queue.py @@ -1,10 +1,7 @@ -from sebs.cache import Cache -from sebs.faas.config import Resources from sebs.faas.queue import Queue, QueueType from azure.core.exceptions import ResourceExistsError from azure.identity import DefaultAzureCredential -from azure.storage.blob import BlobServiceClient from azure.storage.queue import QueueClient diff --git a/sebs/gcp/queue.py b/sebs/gcp/queue.py index 1d03e12f..189fafef 100644 --- a/sebs/gcp/queue.py +++ b/sebs/gcp/queue.py @@ -1,8 +1,3 @@ -from typing import Optional -from googleapiclient.discovery import build - -from sebs.cache import Cache -from sebs.faas.config import Resources from sebs.faas.queue import Queue, QueueType from google.api_core import retry From 32deedbde174091f839d55a9472e749f0e23dd84 Mon Sep 17 00:00:00 2001 From: orosca Date: Mon, 9 Sep 2024 09:51:26 +0200 Subject: [PATCH 6/7] Fix library version --- requirements.gcp.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.gcp.txt b/requirements.gcp.txt index 16dd0ed3..4550ac88 100644 --- a/requirements.gcp.txt +++ b/requirements.gcp.txt @@ -4,4 +4,4 @@ google-api-python-client==1.12.5 google-cloud-monitoring==2.0.0 google-api-python-client-stubs google-cloud-logging==2.0.0 -google-cloud-pubsub==2.23.0 +google-cloud-pubsub From 86540afa7f72f9dfcee87b89d3faf3e0082885d1 Mon Sep 17 00:00:00 2001 From: orosca Date: Mon, 9 Sep 2024 09:55:21 +0200 Subject: [PATCH 7/7] Fix linting --- sebs/azure/queue.py | 2 +- sebs/faas/queue.py | 1 + sebs/gcp/queue.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sebs/azure/queue.py b/sebs/azure/queue.py index e0f67ec9..8fb82fde 100644 --- a/sebs/azure/queue.py +++ b/sebs/azure/queue.py @@ -18,7 +18,7 @@ def deployment_name(): def storage_account(self) -> str: assert self._storage_account return self._storage_account - + @property def account_url(self) -> str: return f"https://{self.storage_account}.queue.core.windows.net" diff --git a/sebs/faas/queue.py b/sebs/faas/queue.py index b0b5b2ca..c22975f0 100644 --- a/sebs/faas/queue.py +++ b/sebs/faas/queue.py @@ -4,6 +4,7 @@ from sebs.utils import LoggingBase + class QueueType(str, Enum): TRIGGER = "trigger" RESULT = "result" diff --git a/sebs/gcp/queue.py b/sebs/gcp/queue.py index 189fafef..27f09ce1 100644 --- a/sebs/gcp/queue.py +++ b/sebs/gcp/queue.py @@ -62,7 +62,7 @@ def create_queue(self): # GCP additionally needs a 'subscription' resource which is the # actual receiver of the messages. It is constructed and destructed # alongside the topic at all times. - self.logging.info(f"Creating queue subscription") + self.logging.info("Creating queue subscription") try: self.subscription_client.create_subscription( name=self.subscription_name,