diff --git a/requirements.gcp.txt b/requirements.gcp.txt index 9cb90916..4550ac88 100644 --- a/requirements.gcp.txt +++ b/requirements.gcp.txt @@ -4,3 +4,4 @@ google-api-python-client==1.12.5 google-cloud-monitoring==2.0.0 google-api-python-client-stubs google-cloud-logging==2.0.0 +google-cloud-pubsub diff --git a/sebs/aws/queue.py b/sebs/aws/queue.py new file mode 100644 index 00000000..ebc32345 --- /dev/null +++ b/sebs/aws/queue.py @@ -0,0 +1,117 @@ +from typing import Optional +from sebs.faas.queue import Queue, QueueType + +import boto3 + + +class SQS(Queue): + @staticmethod + def typename() -> str: + return "AWS.SQS" + + @staticmethod + def deployment_name(): + return "aws" + + @property + def queue_url(self): + return self._queue_url + + @property + def queue_arn(self): + return self._queue_arn + + def __init__( + self, + benchmark: str, + queue_type: QueueType, + region: str, + queue_url: Optional[str] = None, + queue_arn: Optional[str] = None + ): + super().__init__( + benchmark, + queue_type, + region + ) + self._queue_url = queue_url + self._queue_arn = queue_arn + + self.client = boto3.session.Session().client( + 'sqs', + region_name=self.region + ) + + def create_queue(self) -> str: + self.logging.debug(f"Creating queue {self.name}") + + if (self._queue_url and self._queue_arn): + self.logging.debug("Queue already exists, reusing...") + return + + self._queue_url = self.client.create_queue(QueueName=self.name)["QueueUrl"] + self._queue_arn = self.client.get_queue_attributes( + QueueUrl=self.queue_url, + AttributeNames=["QueueArn"], + )["Attributes"]["QueueArn"] + + self.logging.debug("Created queue") + + def remove_queue(self): + self.logging.info(f"Deleting queue {self.name}") + + self.client.delete_queue(QueueUrl=self.queue_url) + + self.logging.info("Deleted queue") + + def send_message(self, serialized_message: str): + self.client.send_message( + QueueUrl=self.queue_url, + MessageBody=serialized_message, + ) + self.logging.info(f"Sent message to queue {self.name}") + + def receive_message(self) -> str: + self.logging.info(f"Pulling a message from {self.name}") + + response = self.client.receive_message( + QueueUrl=self.queue_url, + AttributeNames=["SentTimestamp"], + MaxNumberOfMessages=1, + MessageAttributeNames=["All"], + WaitTimeSeconds=5, + ) + + if ("Messages" not in response): + self.logging.info("No messages to be received") + return "" + + self.logging.info(f"Received a message from {self.name}") + + # Delete the message from the queue - serves as an acknowledgement + # that it was received. + self.client.delete_message( + QueueUrl=self.queue_url, + ReceiptHandle=response["Messages"][0]["ReceiptHandle"], + ) + + return response["Messages"][0]["Body"] + + def serialize(self) -> dict: + return { + "name": self.name, + "type": self.queue_type, + "region": self.region, + "queue_url": self.queue_url, + "queue_arn": self.queue_arn, + } + + @staticmethod + def deserialize(obj: dict) -> "SQS": + return SQS( + obj["name"], + obj["type"], + obj["region"], + obj["queue_url"], + obj["queue_arn"] + ) diff --git a/sebs/azure/queue.py b/sebs/azure/queue.py new file mode 100644 index 00000000..8fb82fde --- /dev/null +++ b/sebs/azure/queue.py @@ -0,0 +1,95 @@ +from sebs.faas.queue import Queue, QueueType + +from azure.core.exceptions import ResourceExistsError +from azure.identity import DefaultAzureCredential +from azure.storage.queue import QueueClient + + +class AzureQueue(Queue): + @staticmethod + def typename() -> str: + return "Azure.Queue" + + @staticmethod + def deployment_name(): + return "azure" + + @property + def storage_account(self) -> str: + assert self._storage_account + return self._storage_account + + @property + def account_url(self) -> str: + return f"https://{self.storage_account}.queue.core.windows.net" + + def __init__( + self, + benchmark: str, + queue_type: QueueType, + storage_account: str, + region: str + ): + default_credential = DefaultAzureCredential() + + super().__init__( + benchmark, + queue_type, + region + ) + self._storage_account = storage_account + self.client = QueueClient(self.account_url, + queue_name=self.name, + credential=default_credential) + + def create_queue(self): + self.logging.info(f"Creating queue {self.name}") + + try: + self.client.create_queue() + self.logging.info("Created queue") + except ResourceExistsError: + self.logging.info("Queue already exists, reusing...") + + def remove_queue(self): + self.logging.info(f"Deleting queue {self.name}") + + self.client.delete_queue() + + self.logging.info("Deleted queue") + + def send_message(self, serialized_message: str): + self.client.send_message(serialized_message) + self.logging.info(f"Sent message to queue {self.name}") + + def receive_message(self) -> str: + self.logging.info(f"Pulling a message from {self.name}") + + response = self.client.receive_messages( + max_messages=1, + timeout=5, + ) + + for msg in response: + self.client.delete_message(msg) + return msg.content + + self.logging.info("No messages to be received") + return "" + + def serialize(self) -> dict: + return { + "name": self.name, + "type": self.queue_type, + "storage_account": self.storage_account, + "region": self.region + } + + @staticmethod + def deserialize(obj: dict) -> "AzureQueue": + return AzureQueue( + obj["name"], + obj["type"], + obj["storage_account"], + obj["region"] + ) diff --git a/sebs/faas/queue.py b/sebs/faas/queue.py new file mode 100644 index 00000000..c22975f0 --- /dev/null +++ b/sebs/faas/queue.py @@ -0,0 +1,63 @@ +from abc import ABC +from abc import abstractmethod +from enum import Enum + +from sebs.utils import LoggingBase + + +class QueueType(str, Enum): + TRIGGER = "trigger" + RESULT = "result" + + +class Queue(ABC, LoggingBase): + + @staticmethod + @abstractmethod + def deployment_name() -> str: + pass + + @property + def region(self): + return self._region + + @property + def queue_type(self): + return self._queue_type + + @property + def name(self): + return self._name + + def __init__( + self, + benchmark: str, + queue_type: QueueType, + region: str + ): + super().__init__() + self._name = benchmark + + # Convention: the trigger queue carries the name of the function. The + # result queue carries the name of the function + "-result". + if (queue_type == QueueType.RESULT and not benchmark.endswith("-result")): + self._name = "{}-{}".format(benchmark, queue_type) + + self._queue_type = queue_type + self._region = region + + @abstractmethod + def create_queue(self): + pass + + @abstractmethod + def remove_queue(self): + pass + + @abstractmethod + def send_message(self, serialized_message: str): + pass + + @abstractmethod + def receive_message(self) -> str: + pass diff --git a/sebs/gcp/queue.py b/sebs/gcp/queue.py new file mode 100644 index 00000000..27f09ce1 --- /dev/null +++ b/sebs/gcp/queue.py @@ -0,0 +1,124 @@ +from sebs.faas.queue import Queue, QueueType + +from google.api_core import retry +from google.api_core.exceptions import AlreadyExists +from google.cloud import pubsub_v1 + +import os + + +class GCPQueue(Queue): + @staticmethod + def typename() -> str: + return "GCP.Queue" + + @staticmethod + def deployment_name(): + return "gcp" + + @property + def topic_name(self): + return self._topic_name + + @property + def subscription_name(self): + return self._subscription_name + + @property + def subscription_client(self): + return self._subscription_client + + def __init__( + self, + benchmark: str, + queue_type: QueueType, + region: str + ): + super().__init__( + benchmark, + queue_type, + region + ) + self.client = pubsub_v1.PublisherClient() + self._subscription_client = pubsub_v1.SubscriberClient() + + self._topic_name = 'projects/{project_id}/topics/{topic}'.format( + project_id=os.getenv('GOOGLE_CLOUD_PROJECT'), + topic=self.name, + ) + self._subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format( + project_id=os.getenv('GOOGLE_CLOUD_PROJECT'), + sub=self.name, + ) + + def create_queue(self): + self.logging.info(f"Creating queue {self.name}") + try: + self.client.create_topic(name=self.topic_name) + self.logging.info("Created queue") + except AlreadyExists: + self.logging.info("Queue already exists, reusing...") + + # GCP additionally needs a 'subscription' resource which is the + # actual receiver of the messages. It is constructed and destructed + # alongside the topic at all times. + self.logging.info("Creating queue subscription") + try: + self.subscription_client.create_subscription( + name=self.subscription_name, + topic=self.topic_name + ) + self.logging.info("Created queue subscription") + except AlreadyExists: + self.logging.info("Subscription already exists, reusing...") + + def remove_queue(self): + self.logging.info(f"Deleting queue and associated subscription{self.name}") + + self.client.delete_topic(topic=self.topic_name) + self.subscription_client.delete_subscription(subscription=self.subscription_name) + + self.logging.info("Deleted queue and associated subscription") + + def send_message(self, serialized_message: str): + self.client.publish(self.topic_name, serialized_message.decode("utf-8")) + self.logging.info(f"Sent message to queue {self.name}") + + # Receive messages through the 'pull' (sync) method. + def receive_message(self) -> str: + self.logging.info(f"Pulling a message from {self.name}") + + response = self.subscription_client.pull( + subscription=self.subscription_name, + max_messages=1, + retry=retry.Retry(deadline=5), + ) + + if (len(response.received_messages) == 0): + self.logging.info("No messages to be received") + return "" + + # Acknowledge the received message so it is not sent again. + received_message = response.received_messages[0] + self.subscription_client.acknowledge( + subscription=self.subscription_name, + ack_ids=[received_message.ack_id], + ) + self.logging.info(f"Received a message from {self.name}") + + return received_message.message.data + + def serialize(self) -> dict: + return { + "name": self.name, + "type": self.queue_type, + "region": self.region, + } + + @staticmethod + def deserialize(obj: dict) -> "GCPQueue": + return GCPQueue( + obj["name"], + obj["type"], + obj["region"], + )