Skip to content

Commit

Permalink
File rename, drag, delete (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
rsamf authored Jul 22, 2024
1 parent 4147f76 commit 0d7bb37
Show file tree
Hide file tree
Showing 10 changed files with 511 additions and 285 deletions.
27 changes: 26 additions & 1 deletion graphbook/dataloading.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,11 @@ def _handle_queues(self):
not q.empty() and self.total_consumer_size < MAX_RESULT_QUEUE_SIZE
):
result, consumer_id = q.get(False)
if consumer_id not in consumers:
continue
consumers[consumer_id].put(result, block=False)
self.total_consumer_size += 1

def get_all_sizes(self):
return {
"load": [q.qsize() for q in self._load_queues],
Expand All @@ -175,6 +177,29 @@ def get_all_sizes(self):
"total_consumer_size": self.total_consumer_size,
}

def clear(self):
def clear_queue(q):
while not q.empty():
try:
q.get(False)
except queue.Empty:
print("Emptying an empty queue. Is the graph still executing?")
break

for q in self._load_queues:
clear_queue(q)
for q in self._dump_queues:
clear_queue(q)
for q in self._load_result_queues:
clear_queue(q)
for q in self._dump_result_queues:
clear_queue(q)
for q in self.consumer_load_queues:
clear_queue(q)
for q in self.consumer_dump_queues:
clear_queue(q)
self.total_consumer_size = 0

def put_load(
self, items: list, record_id: int, load_fn: callable, consumer_id: int
):
Expand Down
4 changes: 3 additions & 1 deletion graphbook/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(
)
self.is_running = False

def exec_step(self, step: Step, input: Note = None, flush: bool = False):
def exec_step(self, step: Step, input: Note | None = None, flush: bool = False):
outputs = {}
step_fn = step if not flush else step.all
start_time = time.time()
Expand Down Expand Up @@ -181,6 +181,8 @@ async def start_loop(self):
elif work["cmd"] == "clear":
if self.try_update_state(work):
self.graph_state.clear_outputs(work.get("step_id"))
self.view_manager.handle_clear(work.get("step_id"))
self.dataloader.clear()
except KeyboardInterrupt:
self.cleanup()
break
Expand Down
37 changes: 21 additions & 16 deletions graphbook/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,10 @@ async def get_output_note(request: web.Request) -> web.Response:
@routes.get(r"/fs/{path:.+}")
def get(request: web.Request):
path = request.match_info.get("path", "")
fullpath = osp.join(root_path, path)
fullpath = osp.join(abs_root_path, path)
assert fullpath.startswith(
root_path
), f"{fullpath} must be within {root_path}"
abs_root_path
), f"{fullpath} must be within {abs_root_path}"

def handle_fs_tree(p: str, fn: callable) -> dict:
if osp.isdir(p):
Expand All @@ -207,9 +207,11 @@ def handle_fs_tree(p: str, fn: callable) -> dict:

def get_stat(path):
stat = os.stat(path)
rel_path = osp.relpath(path, abs_root_path)
st = {
"title": osp.basename(path),
"path": path,
"title": osp.basename(rel_path),
"path": rel_path,
"dirname": osp.dirname(rel_path),
"from_root": abs_root_path,
"access_time": int(stat.st_atime),
"modification_time": int(stat.st_mtime),
Expand Down Expand Up @@ -252,11 +254,16 @@ def get_stat(path):
)

@routes.put("/fs")
@routes.put(r"/fs/{path:[\w\d\./\-\+]+}")
@routes.put(r"/fs/{path:.+}")
async def put(request: web.Request):
path = request.match_info.get("path")
fullpath = osp.join(root_path, path)
data = await request.json()
if request.query.get("mv"):
topath = osp.join(root_path, request.query.get("mv"))
os.rename(fullpath, topath)
return web.json_response({}, status=200)

is_file = data.get("is_file", False)
file_contents = data.get("file_contents", "")
hash_key = data.get("hash_key", "")
Expand All @@ -282,29 +289,27 @@ async def put(request: web.Request):
f.write(file_contents)
return web.json_response({}, status=201)

@routes.delete("/fs/{path}")
@routes.delete("/fs/{path:.+}")
def delete(request):
path = request.match_info["path"]
path_components = path.split("/")
if "." in path_components or ".." in path_components:
return web.json_response(
{"reason": "Path must be absolute."}, status=400
)
path = request.match_info.get("path")
fullpath = osp.join(root_path, path)
assert fullpath.startswith(
root_path
), f"{fullpath} must be within {root_path}"

fullpath = "%s/%s" % (root_path, path)
if osp.exists(fullpath):
if osp.isdir(fullpath):
if os.listdir(fullpath) == []:
os.rmdir(fullpath)
return web.json_response(status=204)
return web.json_response({"success": True}, status=204)
else:
return web.json_response(
{"reason": "/%s: Directory is not empty." % path},
status=403,
)
else:
os.remove(fullpath)
return web.json_response(status=204)
return web.json_response({"success": True}, status=204)
else:
return web.json_response(
{"reason": "/%s: No such file or directory." % path}, status=404
Expand Down
8 changes: 2 additions & 6 deletions graphbook/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,7 @@ def get_nodes(self) -> Tuple[dict, dict]:


class GraphState:
def __init__(
self,
custom_nodes_path: str,
view_manager_queue: mp.Queue
):
def __init__(self, custom_nodes_path: str, view_manager_queue: mp.Queue):
sys.path.append(custom_nodes_path)
self.custom_nodes_path = custom_nodes_path
self.view_manager_queue = view_manager_queue
Expand Down Expand Up @@ -289,7 +285,7 @@ def handle_outputs(self, step_id: str, outputs: Outputs):
self._step_states[step_id].add(StepState.EXECUTED_THIS_RUN)
self.view_manager.handle_queue_size(step_id, self._queues[step_id].dict_sizes())

def clear_outputs(self, step_id: str = None):
def clear_outputs(self, step_id: str | None = None):
if step_id is None:
for q in self._queues.values():
q.clear()
Expand Down
18 changes: 18 additions & 0 deletions graphbook/viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ def handle_start(self, node_id: str):
def handle_end(self):
pass

def handle_clear(self, node_id: str):
pass

def get_next(self):
return None

Expand All @@ -52,6 +55,12 @@ def handle_outputs(self, node_id: str, output: dict):
new_entries = {k: v[0].items for k, v in output.items() if len(v) > 0}
self.last_outputs[node_id] |= new_entries

def handle_clear(self, node_id: str | None = None):
if node_id is None:
self.last_outputs = {}
if node_id in self.last_outputs:
del self.last_outputs[node_id]

def get_next(self):
return self.last_outputs

Expand Down Expand Up @@ -262,6 +271,10 @@ def handle_start(self, node_id: str):
for viewer in self.viewers:
viewer.handle_start(node_id)

def handle_clear(self, node_id: str | None):
for viewer in self.viewers:
viewer.handle_clear(node_id)

def handle_log(self, node_id: str, log: str, type: str):
self.logs_viewer.handle_log(node_id, log, type)

Expand Down Expand Up @@ -300,6 +313,8 @@ def _loop(self):
self.handle_log(work["node_id"], work["log"], work["type"])
elif work["cmd"] == "handle_run_state":
self.handle_run_state(work["is_running"])
elif work["cmd"] == "handle_clear":
self.handle_clear(work["node_id"])
except queue.Empty:
pass

Expand Down Expand Up @@ -349,6 +364,9 @@ def handle_run_state(self, is_running: bool):
{"cmd": "handle_run_state", "is_running": is_running}
)

def handle_clear(self, node_id: str | None):
self.view_manager_queue.put({"cmd": "handle_clear", "node_id": node_id})


class Logger:
def __init__(self, view_manager_queue: mp.Queue, node_id: str, node_name: str):
Expand Down
30 changes: 27 additions & 3 deletions web/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,22 @@ export class ServerAPI {
}
}

private async delete(path): Promise<any> {
try {
const response = await fetch(`http://${this.host}/${path}`, {
method: 'DELETE',
headers: {
'Content-Type': 'application/json'
}
});
if (response.ok) {
return await response.json();
}
} catch (e) {
console.error(e);
return null;
}
}

public addWsEventListener(eventType: string, callback: EventListenerOrEventListenerObject) {
this.listeners.add([eventType, callback]);
Expand Down Expand Up @@ -207,18 +223,26 @@ export class ServerAPI {
return workflowFiles;
}

public async putFile(filepath, isFile, content = null, hash_key = null) {
public async putFile(filepath: string, isFile: boolean, content: string = '', hash_key: string = '') {
return await this.put(`fs/${filepath}`, {
is_file: isFile,
file_contents: content ?? '',
hash_key: hash_key ?? ''
file_contents: content,
hash_key: hash_key
});
}

public async mvFile(oldPath, newPath) {
return await this.put(`fs/${oldPath}?mv=${newPath}`, {});
}

public async getFile(filepath) {
return await this.get(`fs/${filepath}`);
}

public async rmFile(filepath) {
return await this.delete(`fs/${filepath}`);
}

public async getSubflowFromFile(filepath) {
const res = await this.getFile(filepath);
if (res?.content) {
Expand Down
Loading

0 comments on commit 0d7bb37

Please sign in to comment.