Skip to content

Commit

Permalink
Transform DeleteNode into its own class
Browse files Browse the repository at this point in the history
  • Loading branch information
mcopik committed May 1, 2023
1 parent 107aaa5 commit 349b066
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 139 deletions.
95 changes: 92 additions & 3 deletions functions/aws/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ def lock_and_read(self, system_storage: SystemStorage) -> Tuple[bool, dict]:
self._parent_timestamp: Optional[int] = None
while True:
self._parent_timestamp = int(datetime.now().timestamp())
_, self._parent_node = system_storage.lock_node(
parent_lock, self._parent_node = system_storage.lock_node(
str(parent_path), self._parent_timestamp
)
if not lock:
if not parent_lock:
sleep(1)
else:
break
Expand Down Expand Up @@ -282,14 +282,103 @@ def commit_and_unlock(self, system_storage: SystemStorage) -> Tuple[bool, dict]:
return (True, {})


class DeleteNodeExecutor(Executor):
def __init__(self, op: DeleteNode):
super().__init__(op)

@property
def op(self) -> DeleteNode:
return cast(DeleteNode, self._op)

def lock_and_read(self, system_storage: SystemStorage) -> Tuple[bool, dict]:

path = self.op.path
logging.info(f"Attempting to delete node at {path}")

# FIXME :limit number of attempts
while True:
self._timestamp = int(datetime.now().timestamp())
lock, self._node = system_storage.lock_node(path, self._timestamp)
if not lock:
sleep(2)
else:
break

# does the node not exist?
if self._node is None:
system_storage.unlock_node(path, self._timestamp)
return (
False,
{"status": "failure", "path": path, "reason": "node_doesnt_exist"},
)

if len(self._node.children):
system_storage.unlock_node(path, self._timestamp)
return (False, {"status": "failure", "path": path, "reason": "not_empty"})

# lock the parent - unless we're already at the root
node_path = pathlib.Path(path)
parent_path = node_path.parent.absolute()
self._parent_timestamp: Optional[int] = None
while True:
self._parent_timestamp = int(datetime.now().timestamp())
parent_lock, self._parent_node = system_storage.lock_node(
str(parent_path), self._parent_timestamp
)
if not parent_lock:
sleep(2)
else:
break
assert self._parent_node

return (True, {})

def distributor_push(self, client: Client, distributor_queue: DistributorQueue):

assert self._counter
assert self._node
assert self._parent_node

assert distributor_queue
distributor_queue.push(
self._counter,
DistributorDeleteNode(client.session_id, self._node, self._parent_node),
client,
)

def commit_and_unlock(self, system_storage: SystemStorage) -> Tuple[bool, dict]:

assert self._node
assert self._timestamp
assert self._parent_node
assert self._parent_timestamp

# FIXME: we shouldn't use writer ID anymore
self._counter = system_storage.increase_system_counter(0)
if self._counter is None:
return (False, {"status": "failure", "reason": "unknown"})

# remove child from parent node
self._parent_node.children.remove(pathlib.Path(self.op.path).name)

# commit system storage
# FIXME: as a transaction
system_storage.commit_node(
self._parent_node, self._parent_timestamp, set([NodeDataType.CHILDREN])
)
system_storage.delete_node(self._node, self._timestamp)

return (True, {})


def builder(
operation: str, event_id: str, event: dict
) -> Tuple[Optional[Executor], dict]:

ops: Dict[str, Tuple[Type[RequestOperation], Type[Executor]]] = {
"create_node": (CreateNode, CreateNodeExecutor),
"set_data": (SetData, SetDataExecutor),
# "delete_node": (DeleteNode, DeleteNodeExecutor),
"delete_node": (DeleteNode, DeleteNodeExecutor),
"deregister_session": (DeregisterSession, DeregisterSessionExecutor),
}

Expand Down
137 changes: 1 addition & 136 deletions functions/aws/writer.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,13 @@
import json
import logging
import pathlib
import time
from datetime import datetime
from time import sleep
from typing import Callable, Dict, List, Optional
from typing import Optional

from faaskeeper.node import Node, NodeDataType
from faaskeeper.stats import StorageStatistics
from faaskeeper.version import Version
from functions.aws.config import Config
from functions.aws.control.channel import Client
from functions.aws.control.distributor_events import (
DistributorCreateNode,
DistributorDeleteNode,
DistributorSetData,
)
from functions.aws.operations import Executor
from functions.aws.operations import builder as operations_builder

mandatory_event_fields = [
"op",
"path",
"session_id",
"version",
"data",
]

config = Config.instance()

repetitions = 0
Expand All @@ -37,41 +18,6 @@
sum_push = 0.0


def verify_event(id: str, write_event: dict, flags: List[str] = None) -> bool:

events = []
if flags is not None:
events = [*mandatory_event_fields, *flags]
else:
events = [*mandatory_event_fields]

"""
Handle malformed events correctly.
"""
if any(k not in write_event.keys() for k in events):
logging.error(
"Incorrect event with ID {id}, timestamp {timestamp}".format(
id=id, timestamp=write_event["timestamp"]
)
)
return False
return True


# FIXME: proper config
WRITER_ID = 0

"""
The function has the following responsibilities:
1) Create new node, returning success or failure if the node exists
2) Set-up ACL permission on the node.
3) Add the list to user's nodelist in case of an ephemeral node.
4) Create sequential node by appending newest version.
5) Create parents nodes to make sure the entire path exists.
6) Look-up watches in a seperate table.
"""


def execute_operation(op_exec: Executor, client: Client) -> Optional[dict]:

try:
Expand Down Expand Up @@ -99,87 +45,6 @@ def execute_operation(op_exec: Executor, client: Client) -> Optional[dict]:
return {"status": "failure", "reason": "unknown"}


def delete_node(client: Client, id: str, write_event: dict) -> Optional[dict]:

# if not verify_event(id, write_event, verbose_output, ["flags"]):
# return None

try:
# TODO: ephemeral
# TODO: sequential
path = get_object(write_event["path"])
logging.info(f"Attempting to create node at {path}")

# FIXME :limit number of attempts
while True:
timestamp = int(datetime.now().timestamp())
lock, node = config.system_storage.lock_node(path, timestamp)
if not lock:
sleep(2)
else:
break

# does the node not exist?
if node is None:
config.system_storage.unlock_node(path, timestamp)
return {"status": "failure", "path": path, "reason": "node_doesnt_exist"}

if len(node.children):
config.system_storage.unlock_node(path, timestamp)
return {"status": "failure", "path": path, "reason": "not_empty"}

# lock the parent - unless we're already at the root
node_path = pathlib.Path(path)
parent_path = node_path.parent.absolute()
parent_timestamp: Optional[int] = None
while True:
parent_timestamp = int(datetime.now().timestamp())
parent_lock, parent_node = config.system_storage.lock_node(
str(parent_path), parent_timestamp
)
if not lock:
sleep(2)
else:
break
assert parent_node

counter = config.system_storage.increase_system_counter(WRITER_ID)
if counter is None:
return {"status": "failure", "reason": "unknown"}

# remove child from parent node
parent_node.children.remove(pathlib.Path(path).name)

# commit system storage
config.system_storage.commit_node(
parent_node, parent_timestamp, set([NodeDataType.CHILDREN])
)
config.system_storage.delete_node(node, timestamp)

assert config.distributor_queue
config.distributor_queue.push(
counter, DistributorDeleteNode(client.session_id, node, parent_node), client
)
return None
except Exception:
# Report failure to the user
print("Failure!")
import traceback

traceback.print_exc()
return {"status": "failure", "reason": "unknown"}


ops: Dict[str, Callable[[Client, str, dict], Optional[dict]]] = {
# "create_node": create_node,
# "set_data": set_data,
"delete_node": delete_node,
# "deregister_session": deregister_session,
}


# def get_object(obj: dict):
# return next(iter(obj.values()))
def get_object(obj: dict):
return next(iter(obj.values()))

Expand Down

0 comments on commit 349b066

Please sign in to comment.