diff --git a/graphbook/dataloading.py b/graphbook/dataloading.py index 9c735bb..9316035 100644 --- a/graphbook/dataloading.py +++ b/graphbook/dataloading.py @@ -163,9 +163,11 @@ def _handle_queues(self): not q.empty() and self.total_consumer_size < MAX_RESULT_QUEUE_SIZE ): result, consumer_id = q.get(False) + if consumer_id not in consumers: + continue consumers[consumer_id].put(result, block=False) self.total_consumer_size += 1 - + def get_all_sizes(self): return { "load": [q.qsize() for q in self._load_queues], @@ -175,6 +177,29 @@ def get_all_sizes(self): "total_consumer_size": self.total_consumer_size, } + def clear(self): + def clear_queue(q): + while not q.empty(): + try: + q.get(False) + except queue.Empty: + print("Emptying an empty queue. Is the graph still executing?") + break + + for q in self._load_queues: + clear_queue(q) + for q in self._dump_queues: + clear_queue(q) + for q in self._load_result_queues: + clear_queue(q) + for q in self._dump_result_queues: + clear_queue(q) + for q in self.consumer_load_queues: + clear_queue(q) + for q in self.consumer_dump_queues: + clear_queue(q) + self.total_consumer_size = 0 + def put_load( self, items: list, record_id: int, load_fn: callable, consumer_id: int ): diff --git a/graphbook/processing.py b/graphbook/processing.py index eaa6f37..decc9f9 100644 --- a/graphbook/processing.py +++ b/graphbook/processing.py @@ -40,7 +40,7 @@ def __init__( ) self.is_running = False - def exec_step(self, step: Step, input: Note = None, flush: bool = False): + def exec_step(self, step: Step, input: Note | None = None, flush: bool = False): outputs = {} step_fn = step if not flush else step.all start_time = time.time() @@ -181,6 +181,8 @@ async def start_loop(self): elif work["cmd"] == "clear": if self.try_update_state(work): self.graph_state.clear_outputs(work.get("step_id")) + self.view_manager.handle_clear(work.get("step_id")) + self.dataloader.clear() except KeyboardInterrupt: self.cleanup() break diff --git a/graphbook/server.py b/graphbook/server.py index ffd13b4..d5406e8 100644 --- a/graphbook/server.py +++ b/graphbook/server.py @@ -187,10 +187,10 @@ async def get_output_note(request: web.Request) -> web.Response: @routes.get(r"/fs/{path:.+}") def get(request: web.Request): path = request.match_info.get("path", "") - fullpath = osp.join(root_path, path) + fullpath = osp.join(abs_root_path, path) assert fullpath.startswith( - root_path - ), f"{fullpath} must be within {root_path}" + abs_root_path + ), f"{fullpath} must be within {abs_root_path}" def handle_fs_tree(p: str, fn: callable) -> dict: if osp.isdir(p): @@ -207,9 +207,11 @@ def handle_fs_tree(p: str, fn: callable) -> dict: def get_stat(path): stat = os.stat(path) + rel_path = osp.relpath(path, abs_root_path) st = { - "title": osp.basename(path), - "path": path, + "title": osp.basename(rel_path), + "path": rel_path, + "dirname": osp.dirname(rel_path), "from_root": abs_root_path, "access_time": int(stat.st_atime), "modification_time": int(stat.st_mtime), @@ -252,11 +254,16 @@ def get_stat(path): ) @routes.put("/fs") - @routes.put(r"/fs/{path:[\w\d\./\-\+]+}") + @routes.put(r"/fs/{path:.+}") async def put(request: web.Request): path = request.match_info.get("path") fullpath = osp.join(root_path, path) data = await request.json() + if request.query.get("mv"): + topath = osp.join(root_path, request.query.get("mv")) + os.rename(fullpath, topath) + return web.json_response({}, status=200) + is_file = data.get("is_file", False) file_contents = data.get("file_contents", "") hash_key = data.get("hash_key", "") @@ -282,21 +289,19 @@ async def put(request: web.Request): f.write(file_contents) return web.json_response({}, status=201) - @routes.delete("/fs/{path}") + @routes.delete("/fs/{path:.+}") def delete(request): - path = request.match_info["path"] - path_components = path.split("/") - if "." in path_components or ".." in path_components: - return web.json_response( - {"reason": "Path must be absolute."}, status=400 - ) + path = request.match_info.get("path") + fullpath = osp.join(root_path, path) + assert fullpath.startswith( + root_path + ), f"{fullpath} must be within {root_path}" - fullpath = "%s/%s" % (root_path, path) if osp.exists(fullpath): if osp.isdir(fullpath): if os.listdir(fullpath) == []: os.rmdir(fullpath) - return web.json_response(status=204) + return web.json_response({"success": True}, status=204) else: return web.json_response( {"reason": "/%s: Directory is not empty." % path}, @@ -304,7 +309,7 @@ def delete(request): ) else: os.remove(fullpath) - return web.json_response(status=204) + return web.json_response({"success": True}, status=204) else: return web.json_response( {"reason": "/%s: No such file or directory." % path}, status=404 diff --git a/graphbook/state.py b/graphbook/state.py index 30458dd..ca54e70 100644 --- a/graphbook/state.py +++ b/graphbook/state.py @@ -119,11 +119,7 @@ def get_nodes(self) -> Tuple[dict, dict]: class GraphState: - def __init__( - self, - custom_nodes_path: str, - view_manager_queue: mp.Queue - ): + def __init__(self, custom_nodes_path: str, view_manager_queue: mp.Queue): sys.path.append(custom_nodes_path) self.custom_nodes_path = custom_nodes_path self.view_manager_queue = view_manager_queue @@ -289,7 +285,7 @@ def handle_outputs(self, step_id: str, outputs: Outputs): self._step_states[step_id].add(StepState.EXECUTED_THIS_RUN) self.view_manager.handle_queue_size(step_id, self._queues[step_id].dict_sizes()) - def clear_outputs(self, step_id: str = None): + def clear_outputs(self, step_id: str | None = None): if step_id is None: for q in self._queues.values(): q.clear() diff --git a/graphbook/viewer.py b/graphbook/viewer.py index 6432eea..559b687 100644 --- a/graphbook/viewer.py +++ b/graphbook/viewer.py @@ -28,6 +28,9 @@ def handle_start(self, node_id: str): def handle_end(self): pass + def handle_clear(self, node_id: str): + pass + def get_next(self): return None @@ -52,6 +55,12 @@ def handle_outputs(self, node_id: str, output: dict): new_entries = {k: v[0].items for k, v in output.items() if len(v) > 0} self.last_outputs[node_id] |= new_entries + def handle_clear(self, node_id: str | None = None): + if node_id is None: + self.last_outputs = {} + if node_id in self.last_outputs: + del self.last_outputs[node_id] + def get_next(self): return self.last_outputs @@ -262,6 +271,10 @@ def handle_start(self, node_id: str): for viewer in self.viewers: viewer.handle_start(node_id) + def handle_clear(self, node_id: str | None): + for viewer in self.viewers: + viewer.handle_clear(node_id) + def handle_log(self, node_id: str, log: str, type: str): self.logs_viewer.handle_log(node_id, log, type) @@ -300,6 +313,8 @@ def _loop(self): self.handle_log(work["node_id"], work["log"], work["type"]) elif work["cmd"] == "handle_run_state": self.handle_run_state(work["is_running"]) + elif work["cmd"] == "handle_clear": + self.handle_clear(work["node_id"]) except queue.Empty: pass @@ -349,6 +364,9 @@ def handle_run_state(self, is_running: bool): {"cmd": "handle_run_state", "is_running": is_running} ) + def handle_clear(self, node_id: str | None): + self.view_manager_queue.put({"cmd": "handle_clear", "node_id": node_id}) + class Logger: def __init__(self, view_manager_queue: mp.Queue, node_id: str, node_name: str): diff --git a/web/src/api.ts b/web/src/api.ts index 7db1a90..80b1c7d 100644 --- a/web/src/api.ts +++ b/web/src/api.ts @@ -141,6 +141,22 @@ export class ServerAPI { } } + private async delete(path): Promise { + try { + const response = await fetch(`http://${this.host}/${path}`, { + method: 'DELETE', + headers: { + 'Content-Type': 'application/json' + } + }); + if (response.ok) { + return await response.json(); + } + } catch (e) { + console.error(e); + return null; + } + } public addWsEventListener(eventType: string, callback: EventListenerOrEventListenerObject) { this.listeners.add([eventType, callback]); @@ -207,18 +223,26 @@ export class ServerAPI { return workflowFiles; } - public async putFile(filepath, isFile, content = null, hash_key = null) { + public async putFile(filepath: string, isFile: boolean, content: string = '', hash_key: string = '') { return await this.put(`fs/${filepath}`, { is_file: isFile, - file_contents: content ?? '', - hash_key: hash_key ?? '' + file_contents: content, + hash_key: hash_key }); } + public async mvFile(oldPath, newPath) { + return await this.put(`fs/${oldPath}?mv=${newPath}`, {}); + } + public async getFile(filepath) { return await this.get(`fs/${filepath}`); } + public async rmFile(filepath) { + return await this.delete(`fs/${filepath}`); + } + public async getSubflowFromFile(filepath) { const res = await this.getFile(filepath); if (res?.content) { diff --git a/web/src/components/LeftPanel/Filesystem.jsx b/web/src/components/LeftPanel/Filesystem.jsx deleted file mode 100644 index c76924a..0000000 --- a/web/src/components/LeftPanel/Filesystem.jsx +++ /dev/null @@ -1,218 +0,0 @@ -import { Flex, Input, Tree, Button, Typography } from "antd"; -import { useState, useMemo, useCallback, useEffect } from "react"; -import { FileAddOutlined, FolderAddOutlined, UndoOutlined } from "@ant-design/icons"; -import { useAPI } from "../../hooks/API"; -import { keyRecursively } from "../../utils"; -import { filesystemDragBegin } from "../../utils"; -import DefaultWorkflow from "../../DefaultWorkflow.json"; -const { Text } = Typography; -const { Search } = Input; - -import './filesystem.css'; - -const initialFiles = []; - -const getParentKey = (key, tree) => { - let parentKey; - for (let i = 0; i < tree.length; i++) { - const node = tree[i]; - if (node.children) { - if (node.children.some((item) => item.key === key)) { - parentKey = node.key; - } else if (getParentKey(key, node.children)) { - parentKey = getParentKey(key, node.children); - } - } - } - return parentKey; - }; - -export default function Filesystem({ setWorkflow, onBeginEdit }) { - const [files, setFiles] = useState(initialFiles); - const [filesRoot, setFilesRoot] = useState('.'); - const [expandedKeys, setExpandedKeys] = useState([]); - const [searchValue, setSearchValue] = useState(''); - const [autoExpandParent, setAutoExpandParent] = useState(true); - const [addingState, setAddingState] = useState({ isAddingItem: false, isAddingFile: true }); - const API = useAPI(); - - const getFiles = useCallback(async () => { - if (API === null) { - return; - } - const files = await API.listFiles(); - if (!files) { - return; - } - const splitPath = files.children[0].from_root.split('/'); - const filesRoot = splitPath[splitPath.length-1]; - setFiles(files.children); - setFilesRoot(filesRoot); - }, [API]); - - useEffect(() => { - getFiles(); - }, [API]); - - const onExpand = (newExpandedKeys) => { - setExpandedKeys(newExpandedKeys); - setAutoExpandParent(false); - }; - - const onSearchChange = (e) => { - const { value } = e.target; - const newExpandedKeys = []; - const findExpandedKeys = (data) => { - data.forEach((item) => { - if (item.children) { - findExpandedKeys(item.children); - } - if (item.title.indexOf(value) > -1) { - newExpandedKeys.push(getParentKey(item.key, files)); - } - }); - } - findExpandedKeys(files); - setExpandedKeys(newExpandedKeys); - setSearchValue(value); - setAutoExpandParent(true); - }; - - const onFileItemClick = useCallback((selectedKeys, { node }) => { - if (!node) { - onBeginEdit(null); - return; - } - const { filename } = node.title.props; - if (!filename) { - onBeginEdit(null); - return; - } - if (filename.slice(-3) == '.py') { - onBeginEdit({name: filename}); - } else if(filename.slice(-5) == '.json') { - setWorkflow(filename); - onBeginEdit(null); - } else { - onBeginEdit(null); - } - }); - - const onAddItem = useCallback(async (e, isFile) => { - const { value } = e.target; - if (!value) { - setAddingState({ isAddingItem: false, isAddingFile: true }); - return; - } - try { - const filename = e.target.value; - const content = filename.endsWith('.json') ? DefaultWorkflow : ""; - await API.putFile(filename, isFile, JSON.stringify(content)); - setWorkflow(filename); - getFiles(); - } catch (e) { - console.error(e); - } - setAddingState({ isAddingItem: false, isAddingFile: true }); - }); - - const treeData = useMemo(() => { - const loop = (data, parentName="") => ( - data.map((item) => { - const strTitle = item.title; - const filename = parentName + strTitle; - const index = strTitle.indexOf(searchValue); - const beforeStr = strTitle.substring(0, index); - const afterStr = strTitle.slice(index + searchValue.length); - let title = - index > -1 ? ( - - {beforeStr} - {searchValue} - {afterStr} - - - ) : ( - - {strTitle} - - ); - - if (item.children) { - title = ; - return { ...item, title, children: loop(item.children, filename + "/"), isLeaf: false, strTitle }; - } - - title = onFileItemClick(title, filename)}/>; - - return { - ...item, - title, - isLeaf: true, - strTitle - }; - }).sort((a, b) => { - if (a.isLeaf !== b.isLeaf) { - return a.isLeaf ? 1 : -1; - } - - return a.strTitle.localeCompare(b.strTitle); - }) - ); - - const currItems = loop(files); - const { isAddingItem, isAddingFile } = addingState; - const callback = (e) => onAddItem(e, isAddingFile); - const pendingItem = { - title: , - isLeaf: isAddingFile, - selectable: false, - disabled: true - }; - const items = isAddingItem ? [...currItems, pendingItem] : currItems; - return keyRecursively(items); - }, [searchValue, files, addingState]); - - return ( -
- - - {filesRoot}/ -
-
-
- -
- ); - -} - -function DirItem({ title }) { - return ( - - {title} - - ); -} - -function FileItem({ title, filename, fullpath, onClick }) { - const onDragStart = useCallback((e) => { - filesystemDragBegin(filename, e); - }, [filename]); - - return ( - - {title} - - ); -} \ No newline at end of file diff --git a/web/src/components/LeftPanel/Filesystem.tsx b/web/src/components/LeftPanel/Filesystem.tsx new file mode 100644 index 0000000..6a12dd7 --- /dev/null +++ b/web/src/components/LeftPanel/Filesystem.tsx @@ -0,0 +1,364 @@ +import { Flex, Input, Tree, Button, Typography, Menu } from "antd"; +import React, { useState, useMemo, useCallback, useEffect } from "react"; +import { FileAddOutlined, FolderAddOutlined, UndoOutlined } from "@ant-design/icons"; +import { useAPI } from "../../hooks/API"; +import { filesystemDragBegin } from "../../utils"; +import DefaultWorkflow from "../../DefaultWorkflow.json"; +import type { TreeProps } from 'antd'; +const { Text } = Typography; +const { Search } = Input; + +import './filesystem.css'; + + +export default function Filesystem({ setWorkflow, onBeginEdit }) { + const [files, setFiles] = useState([]); + const [filesRoot, setFilesRoot] = useState('.'); + const [expandedKeys, setExpandedKeys] = useState([]); + const [searchValue, setSearchValue] = useState(''); + const [autoExpandParent, setAutoExpandParent] = useState(true); + const [addingState, setAddingState] = useState({ isAddingItem: false, isAddingFile: true }); + const [contextMenu, setContextMenu] = useState<{ x: number, y: number, filename: string } | null>(null); + const [renamingState, setRenamingState] = useState({ isRenaming: false, filename: '' }); + const [selectedWorkflow, setSelectedWorkflow] = useState(null); + const API = useAPI(); + + useEffect(() => { + const removeContextMenu = () => { + setContextMenu(null); + }; + + window.addEventListener('click', removeContextMenu); + return () => { + window.removeEventListener('click', removeContextMenu); + }; + }, []); + + const getFiles = useCallback(async () => { + if (API === null) { + return; + } + const files = await API.listFiles(); + if (!files) { + return; + } + + const splitPath = files.children[0].from_root.split('/'); + const filesRoot = splitPath[splitPath.length - 1]; + const setKey = (data) => { + data.forEach((item) => { + item.key = item.path; + if (item.children) { + setKey(item.children); + } + }); + }; + setKey(files.children); + setFiles(files.children); + setFilesRoot(filesRoot); + }, [API]); + + useEffect(() => { + getFiles(); + }, [API]); + + const onExpand = (newExpandedKeys) => { + setExpandedKeys(newExpandedKeys); + setAutoExpandParent(false); + }; + + const onSearchChange = useCallback((e) => { + const { value } = e.target; + const newExpandedKeys: string[] = []; + const findExpandedKeys = (data, parentKey: string | null) => { + data.forEach((item) => { + if (item.children) { + findExpandedKeys(item.children, item.key); + } + if (item.path.indexOf(value) > -1 && parentKey) { + newExpandedKeys.push(parentKey); + } + }); + }; + findExpandedKeys(files, null); + setExpandedKeys(newExpandedKeys); + setSearchValue(value); + setAutoExpandParent(true); + }, [files]); + + const onFileItemClick = useCallback((selectedKeys, { node }) => { + if (!node) { + onBeginEdit(null); + return; + } + const filename = node.path; + if (!filename) { + onBeginEdit(null); + return; + } + if (filename.slice(-3) == '.py') { + onBeginEdit({ name: filename }); + } else if (filename.slice(-5) == '.json') { + setWorkflow(filename); + setSelectedWorkflow(filename); + onBeginEdit(null); + } else { + onBeginEdit(null); + } + }, []); + + const onFileItemRightClick = useCallback(({ event, node }) => { + setContextMenu({ x: event.clientX, y: event.clientY, filename: node.path }); + }, []); + + const onItemRename = useCallback(async (newFilename) => { + if (API) { + await API.mvFile(renamingState.filename, newFilename); + getFiles(); + } + setRenamingState({ isRenaming: false, filename: '' }); + }, [API, renamingState]); + + const onItemDelete = useCallback(async (filename) => { + if (API) { + await API.rmFile(filename); + getFiles(); + if (filename === selectedWorkflow) { + setWorkflow(null); + setSelectedWorkflow(null); + } + } + }, [API, selectedWorkflow]); + + const onAddItem = useCallback(async (e, isFile) => { + const { value } = e.target; + if (!value) { + setAddingState({ isAddingItem: false, isAddingFile: true }); + return; + } + try { + if (API) { + const filename = e.target.value; + const isJSON = filename.endsWith('.json'); + const content = isJSON ? DefaultWorkflow : ""; + await API.putFile(filename, isFile, JSON.stringify(content)); + if (isJSON) { + setWorkflow(filename); + setSelectedWorkflow(filename); + } + getFiles(); + } + } catch (e) { + console.error(e); + } + setAddingState({ isAddingItem: false, isAddingFile: true }); + }, [API]); + + const treeData = useMemo(() => { + const loop = (data) => ( + data.map((item) => { + const strTitle = item.title; + const filename = item.path; + const index = strTitle.indexOf(searchValue); + const beforeStr = strTitle.substring(0, index); + const afterStr = strTitle.slice(index + searchValue.length); + let title = + index > -1 ? ( + + {beforeStr} + {searchValue} + {afterStr} + + + ) : ( + + {strTitle} + + ); + + if (item.children) { + title = ( + + ); + return { ...item, title, children: loop(item.children), isLeaf: false, strTitle }; + } + + title = ( + + ); + + + return { + ...item, + title, + isLeaf: true, + strTitle + }; + }).sort((a, b) => { + if (a.isLeaf !== b.isLeaf) { + return a.isLeaf ? 1 : -1; + } + + return a.strTitle.localeCompare(b.strTitle); + }) + ); + + const currItems = loop(files); + const { isAddingItem, isAddingFile } = addingState; + const callback = (e) => onAddItem(e, isAddingFile); + const pendingItem = { + title: , + isLeaf: isAddingFile, + selectable: false, + disabled: true + }; + const items = isAddingItem ? [...currItems, pendingItem] : currItems; + return items; + }, [searchValue, files, addingState, renamingState]); + + const contextMenuItems = useMemo(() => { + return [ + { + key: 'rename', + label: 'Rename', + }, + { + key: 'delete', + label: 'Delete', + } + ]; + }, []); + + const onContextMenuClick = useCallback(({ key }) => { + if (contextMenu) { + if (key === 'rename') { + setRenamingState({ isRenaming: true, filename: contextMenu.filename }); + } else if (key === 'delete') { + onItemDelete(contextMenu.filename); + } + } + + setContextMenu(null); + }, [contextMenu]); + + const onDrop: TreeProps['onDrop'] = useCallback(async (info) => { + if (!API) { + return; + } + const basename = (p) => { + const parts = p.split('/'); + return parts[parts.length - 1]; + }; + const itemDragged = info.dragNode.path; + const itemDraggedBasename = basename(itemDragged); + const newDir = !info.dropToGap ? info.node.dirname + "/" + basename(info.node.path) : info.node.dirname; + let newItemDraggedName = newDir + "/" + itemDraggedBasename; + if (newItemDraggedName[0] === '/') { + newItemDraggedName = newItemDraggedName.slice(1); + } + + await API.mvFile(itemDragged, newItemDraggedName); + getFiles(); + }, [API]); + + return ( +
+ + + {filesRoot}/ +
+
+
+ + {contextMenu && ( + + )} +
+ ); + +} + +function DirItem({ title, filename, isRenaming, onRename }) { + const [currentFilename, setCurrentFilename] = useState(filename); + + const onChange = useCallback((e) => { + setCurrentFilename(e.target.value); + }, []); + + const onDone = useCallback(() => { + onRename(currentFilename); + }, [currentFilename, onRename]); + + if (isRenaming) { + return ( + + + + ); + } + + return ( + + {title} + + ); +} + +function FileItem({ title, filename, fullpath, isRenaming, onRename }) { + const [currentFilename, setCurrentFilename] = useState(filename); + + const onDragStart = useCallback((e) => { + filesystemDragBegin(filename, e); + }, [filename]); + + const onChange = useCallback((e) => { + setCurrentFilename(e.target.value); + }, []); + + const onDone = useCallback(() => { + onRename(currentFilename); + }, [currentFilename, onRename]); + + if (isRenaming) { + return ( + + + + ); + } + + return ( + + {title} + + ); +} \ No newline at end of file diff --git a/web/src/graph.ts b/web/src/graph.ts index 17d5205..5d311bd 100644 --- a/web/src/graph.ts +++ b/web/src/graph.ts @@ -41,6 +41,40 @@ type SerializedResourceMap = { [id: string]: SerializedResource }; +export const resolveSubflowOutputs = (exportNode: Node, nodes: Node[], edges: Edge[], parent = "") => { + const outputs: Array<{ node: string, pin: string }> = []; + const sourceEdges = edges.filter((edge) => edge.target === exportNode.id); + for (const edge of sourceEdges) { + const exported = nodes.find((n) => n.id === edge.source); + if (!exported) { + continue; + } + if (exported.type === 'step') { + outputs.push({ node: parent + "/" + exported.id, pin: edge.sourceHandle || '' }); + } else { + if (exported.type === 'group') { + const innerSourceHandle = `${edge.sourceHandle}_inner`; + for (const edge of edges) { + if (edge.targetHandle === innerSourceHandle) { + const innerExported = nodes.find((n) => n.id === edge.source); + if (!innerExported) { + continue; + } + if (innerExported.type === 'step') { + outputs.push({ node: parent + "/" + innerExported.id, pin: edge.sourceHandle || '' }); + } else if (innerExported.type === 'subflow') { + outputs.push(...innerExported.data.properties.stepOutputs[edge.sourceHandle || '']); + } + } + } + } else if (exported.type === 'subflow') { + outputs.push(...exported.data.properties.stepOutputs[edge.sourceHandle || '']); + } + } + } + return outputs; +}; + export const checkForSerializationErrors = (G, resources): SerializationError[] => { const errors: SerializationError[] = []; Object.entries(G).forEach(([id, node]) => { @@ -375,40 +409,6 @@ export const Graph = { localStorage.setItem('graph', Graph.serialize(nodes, edges)); }, parseGraph: async (graph: any, API: ServerAPI) => { - const resolveSubflowOutputs = (exportNode: Node, nodes: Node[], edges: Edge[], parent = "") => { - const outputs: Array<{ node: string, pin: string }> = []; - const sourceEdges = edges.filter((edge) => edge.target === exportNode.id); - for (const edge of sourceEdges) { - const exported = nodes.find((n) => n.id === edge.source); - if (!exported) { - continue; - } - if (exported.type === 'step') { - outputs.push({ node: parent + "/" + exported.id, pin: edge.sourceHandle || '' }); - } else { - if (exported.type === 'group') { - const innerSourceHandle = `${edge.sourceHandle}_inner`; - for (const edge of edges) { - if (edge.targetHandle === innerSourceHandle) { - const innerExported = nodes.find((n) => n.id === edge.source); - if (!innerExported) { - continue; - } - if (innerExported.type === 'step') { - outputs.push({ node: parent + "/" + innerExported.id, pin: edge.sourceHandle || '' }); - } else if (innerExported.type === 'subflow') { - outputs.push(...innerExported.data.properties.stepOutputs[edge.sourceHandle || '']); - } - } - } - } else if (exported.type === 'subflow') { - outputs.push(...exported.data.properties.stepOutputs[edge.sourceHandle || '']); - } - } - } - return outputs; - }; - const parseNodes = async (nodes: Array) => { for (const node of nodes) { if (node.type === 'subflow') { diff --git a/web/src/utils.ts b/web/src/utils.ts index 938329f..ef353c5 100644 --- a/web/src/utils.ts +++ b/web/src/utils.ts @@ -1,4 +1,4 @@ -import { Graph } from "./graph"; +import { Graph, resolveSubflowOutputs } from "./graph"; import type { ServerAPI } from "./api"; import type { ReactFlowInstance } from "reactflow"; import type { Node } from "reactflow"; @@ -60,6 +60,10 @@ export const filesystemDragEnd = async (reactFlowInstance: ReactFlowInstance, AP return; } + const { setNodes, getNodes } = reactFlowInstance; + const dropPosition = reactFlowInstance.screenToFlowPosition({ x: e.clientX, y: e.clientY }); + const nodes = getNodes(); + const id = uniqueIdFrom(nodes); let nodeData: any = { name: 'Text', parameters: { val: { type: "string", value: data.value } } @@ -72,24 +76,30 @@ export const filesystemDragEnd = async (reactFlowInstance: ReactFlowInstance, AP if (jsonData?.type === 'workflow') { type = 'subflow'; const name = data.value.split('/').pop().slice(0, -5); + const stepOutputs = {}; + let currentStepOutputId = 0; + for (const n of jsonData.nodes) { + if (n.type === 'export' && n.data.exportType === 'output' && !n.data.isResource) { + stepOutputs[currentStepOutputId++] = resolveSubflowOutputs(n, jsonData.nodes, jsonData.edges, id); + } + } nodeData = { name, label: name, filename: data.value, properties: { nodes: jsonData.nodes, - edges: jsonData.edges + edges: jsonData.edges, + stepOutputs + } } } } } - const { setNodes, getNodes } = reactFlowInstance; - const dropPosition = reactFlowInstance.screenToFlowPosition({ x: e.clientX, y: e.clientY }); - const nodes = getNodes(); const node = { - id: uniqueIdFrom(nodes), + id, position: dropPosition, type, data: nodeData