diff --git a/functions/aws/operations.py b/functions/aws/operations.py index c7b0bbd..c69c81d 100644 --- a/functions/aws/operations.py +++ b/functions/aws/operations.py @@ -77,10 +77,10 @@ def lock_and_read(self, system_storage: SystemStorage) -> Tuple[bool, dict]: self._parent_timestamp: Optional[int] = None while True: self._parent_timestamp = int(datetime.now().timestamp()) - _, self._parent_node = system_storage.lock_node( + parent_lock, self._parent_node = system_storage.lock_node( str(parent_path), self._parent_timestamp ) - if not lock: + if not parent_lock: sleep(1) else: break @@ -282,6 +282,95 @@ def commit_and_unlock(self, system_storage: SystemStorage) -> Tuple[bool, dict]: return (True, {}) +class DeleteNodeExecutor(Executor): + def __init__(self, op: DeleteNode): + super().__init__(op) + + @property + def op(self) -> DeleteNode: + return cast(DeleteNode, self._op) + + def lock_and_read(self, system_storage: SystemStorage) -> Tuple[bool, dict]: + + path = self.op.path + logging.info(f"Attempting to delete node at {path}") + + # FIXME :limit number of attempts + while True: + self._timestamp = int(datetime.now().timestamp()) + lock, self._node = system_storage.lock_node(path, self._timestamp) + if not lock: + sleep(2) + else: + break + + # does the node not exist? + if self._node is None: + system_storage.unlock_node(path, self._timestamp) + return ( + False, + {"status": "failure", "path": path, "reason": "node_doesnt_exist"}, + ) + + if len(self._node.children): + system_storage.unlock_node(path, self._timestamp) + return (False, {"status": "failure", "path": path, "reason": "not_empty"}) + + # lock the parent - unless we're already at the root + node_path = pathlib.Path(path) + parent_path = node_path.parent.absolute() + self._parent_timestamp: Optional[int] = None + while True: + self._parent_timestamp = int(datetime.now().timestamp()) + parent_lock, self._parent_node = system_storage.lock_node( + str(parent_path), self._parent_timestamp + ) + if not parent_lock: + sleep(2) + else: + break + assert self._parent_node + + return (True, {}) + + def distributor_push(self, client: Client, distributor_queue: DistributorQueue): + + assert self._counter + assert self._node + assert self._parent_node + + assert distributor_queue + distributor_queue.push( + self._counter, + DistributorDeleteNode(client.session_id, self._node, self._parent_node), + client, + ) + + def commit_and_unlock(self, system_storage: SystemStorage) -> Tuple[bool, dict]: + + assert self._node + assert self._timestamp + assert self._parent_node + assert self._parent_timestamp + + # FIXME: we shouldn't use writer ID anymore + self._counter = system_storage.increase_system_counter(0) + if self._counter is None: + return (False, {"status": "failure", "reason": "unknown"}) + + # remove child from parent node + self._parent_node.children.remove(pathlib.Path(self.op.path).name) + + # commit system storage + # FIXME: as a transaction + system_storage.commit_node( + self._parent_node, self._parent_timestamp, set([NodeDataType.CHILDREN]) + ) + system_storage.delete_node(self._node, self._timestamp) + + return (True, {}) + + def builder( operation: str, event_id: str, event: dict ) -> Tuple[Optional[Executor], dict]: @@ -289,7 +378,7 @@ def builder( ops: Dict[str, Tuple[Type[RequestOperation], Type[Executor]]] = { "create_node": (CreateNode, CreateNodeExecutor), "set_data": (SetData, SetDataExecutor), - # "delete_node": (DeleteNode, DeleteNodeExecutor), + "delete_node": (DeleteNode, DeleteNodeExecutor), "deregister_session": (DeregisterSession, DeregisterSessionExecutor), } diff --git a/functions/aws/writer.py b/functions/aws/writer.py index 8dce29b..f9db413 100644 --- a/functions/aws/writer.py +++ b/functions/aws/writer.py @@ -1,32 +1,13 @@ import json import logging -import pathlib -import time -from datetime import datetime -from time import sleep -from typing import Callable, Dict, List, Optional +from typing import Optional -from faaskeeper.node import Node, NodeDataType from faaskeeper.stats import StorageStatistics -from faaskeeper.version import Version from functions.aws.config import Config from functions.aws.control.channel import Client -from functions.aws.control.distributor_events import ( - DistributorCreateNode, - DistributorDeleteNode, - DistributorSetData, -) from functions.aws.operations import Executor from functions.aws.operations import builder as operations_builder -mandatory_event_fields = [ - "op", - "path", - "session_id", - "version", - "data", -] - config = Config.instance() repetitions = 0 @@ -37,41 +18,6 @@ sum_push = 0.0 -def verify_event(id: str, write_event: dict, flags: List[str] = None) -> bool: - - events = [] - if flags is not None: - events = [*mandatory_event_fields, *flags] - else: - events = [*mandatory_event_fields] - - """ - Handle malformed events correctly. - """ - if any(k not in write_event.keys() for k in events): - logging.error( - "Incorrect event with ID {id}, timestamp {timestamp}".format( - id=id, timestamp=write_event["timestamp"] - ) - ) - return False - return True - - -# FIXME: proper config -WRITER_ID = 0 - -""" - The function has the following responsibilities: - 1) Create new node, returning success or failure if the node exists - 2) Set-up ACL permission on the node. - 3) Add the list to user's nodelist in case of an ephemeral node. - 4) Create sequential node by appending newest version. - 5) Create parents nodes to make sure the entire path exists. - 6) Look-up watches in a seperate table. -""" - - def execute_operation(op_exec: Executor, client: Client) -> Optional[dict]: try: @@ -99,87 +45,6 @@ def execute_operation(op_exec: Executor, client: Client) -> Optional[dict]: return {"status": "failure", "reason": "unknown"} -def delete_node(client: Client, id: str, write_event: dict) -> Optional[dict]: - - # if not verify_event(id, write_event, verbose_output, ["flags"]): - # return None - - try: - # TODO: ephemeral - # TODO: sequential - path = get_object(write_event["path"]) - logging.info(f"Attempting to create node at {path}") - - # FIXME :limit number of attempts - while True: - timestamp = int(datetime.now().timestamp()) - lock, node = config.system_storage.lock_node(path, timestamp) - if not lock: - sleep(2) - else: - break - - # does the node not exist? - if node is None: - config.system_storage.unlock_node(path, timestamp) - return {"status": "failure", "path": path, "reason": "node_doesnt_exist"} - - if len(node.children): - config.system_storage.unlock_node(path, timestamp) - return {"status": "failure", "path": path, "reason": "not_empty"} - - # lock the parent - unless we're already at the root - node_path = pathlib.Path(path) - parent_path = node_path.parent.absolute() - parent_timestamp: Optional[int] = None - while True: - parent_timestamp = int(datetime.now().timestamp()) - parent_lock, parent_node = config.system_storage.lock_node( - str(parent_path), parent_timestamp - ) - if not lock: - sleep(2) - else: - break - assert parent_node - - counter = config.system_storage.increase_system_counter(WRITER_ID) - if counter is None: - return {"status": "failure", "reason": "unknown"} - - # remove child from parent node - parent_node.children.remove(pathlib.Path(path).name) - - # commit system storage - config.system_storage.commit_node( - parent_node, parent_timestamp, set([NodeDataType.CHILDREN]) - ) - config.system_storage.delete_node(node, timestamp) - - assert config.distributor_queue - config.distributor_queue.push( - counter, DistributorDeleteNode(client.session_id, node, parent_node), client - ) - return None - except Exception: - # Report failure to the user - print("Failure!") - import traceback - - traceback.print_exc() - return {"status": "failure", "reason": "unknown"} - - -ops: Dict[str, Callable[[Client, str, dict], Optional[dict]]] = { - # "create_node": create_node, - # "set_data": set_data, - "delete_node": delete_node, - # "deregister_session": deregister_session, -} - - -# def get_object(obj: dict): -# return next(iter(obj.values())) def get_object(obj: dict): return next(iter(obj.values()))