From 5917256e5762d5fecbdff3ca931b30f03e9c0d94 Mon Sep 17 00:00:00 2001 From: Henry Date: Tue, 19 Dec 2023 23:48:18 +0000 Subject: [PATCH 1/2] add fix to avoid running duplicated nodes when vector upsert --- .../nodes/vectorstores/Redis/Redis.ts | 6 +- .../chatflows/Multiple VectorDB.json | 4 +- packages/server/src/index.ts | 133 +++++++++++++++--- packages/server/src/utils/index.ts | 6 +- 4 files changed, 120 insertions(+), 29 deletions(-) diff --git a/packages/components/nodes/vectorstores/Redis/Redis.ts b/packages/components/nodes/vectorstores/Redis/Redis.ts index dc993b86699..49f9e8ffc3e 100644 --- a/packages/components/nodes/vectorstores/Redis/Redis.ts +++ b/packages/components/nodes/vectorstores/Redis/Redis.ts @@ -148,10 +148,10 @@ class Redis_VectorStores implements INode { } } - const redisClient = createClient({ url: redisUrl }) - await redisClient.connect() - try { + const redisClient = createClient({ url: redisUrl }) + await redisClient.connect() + const storeConfig: RedisVectorStoreConfig = { redisClient: redisClient, indexName: indexName diff --git a/packages/server/marketplaces/chatflows/Multiple VectorDB.json b/packages/server/marketplaces/chatflows/Multiple VectorDB.json index 789b0c08e94..a2a807cdc76 100644 --- a/packages/server/marketplaces/chatflows/Multiple VectorDB.json +++ b/packages/server/marketplaces/chatflows/Multiple VectorDB.json @@ -54,7 +54,7 @@ "inputs": { "name": "ai-paper-qa", "description": "AI Paper QA - useful for when you need to ask questions about the AI-Generated Content paper.", - "returnDirect": "", + "returnDirect": true, "baseChain": "{{retrievalQAChain_0.data.instance}}" }, "outputAnchors": [ @@ -128,7 +128,7 @@ "inputs": { "name": "state-of-union-qa", "description": "State of the Union QA - useful for when you need to ask questions about the president speech and most recent state of the union address.", - "returnDirect": "", + "returnDirect": true, "baseChain": "{{retrievalQAChain_1.data.instance}}" }, "outputAnchors": [ diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 8766e19365f..f69148bbd58 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -21,7 +21,8 @@ import { chatType, IChatMessage, IReactFlowEdge, - IDepthQueue + IDepthQueue, + INodeDirectedGraph } from './Interface' import { getNodeModulesPackagePath, @@ -44,7 +45,8 @@ import { checkMemorySessionId, clearSessionMemoryFromViewMessageDialog, getUserHome, - replaceChatHistory + replaceChatHistory, + getAllConnectedNodes } from './utils' import { cloneDeep, omit, uniqWith, isEqual } from 'lodash' import { getDataSource } from './DataSource' @@ -1091,12 +1093,12 @@ export class App { upload.array('files'), (req: Request, res: Response, next: NextFunction) => getRateLimiter(req, res, next), async (req: Request, res: Response) => { - await this.buildChatflow(req, res, undefined, false, true) + await this.upsertVector(req, res) } ) this.app.post('/api/v1/vector/internal-upsert/:id', async (req: Request, res: Response) => { - await this.buildChatflow(req, res, undefined, true, true) + await this.upsertVector(req, res, true) }) // ---------------------------------------- @@ -1373,6 +1375,108 @@ export class App { return undefined } + async upsertVector(req: Request, res: Response, isInternal: boolean = false) { + try { + const chatflowid = req.params.id + let incomingInput: IncomingInput = req.body + + const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({ + id: chatflowid + }) + if (!chatflow) return res.status(404).send(`Chatflow ${chatflowid} not found`) + + if (!isInternal) { + const isKeyValidated = await this.validateKey(req, chatflow) + if (!isKeyValidated) return res.status(401).send('Unauthorized') + } + + const files = (req.files as any[]) || [] + + if (files.length) { + const overrideConfig: ICommonObject = { ...req.body } + for (const file of files) { + const fileData = fs.readFileSync(file.path, { encoding: 'base64' }) + const dataBase64String = `data:${file.mimetype};base64,${fileData},filename:${file.filename}` + + const fileInputField = mapMimeTypeToInputField(file.mimetype) + if (overrideConfig[fileInputField]) { + overrideConfig[fileInputField] = JSON.stringify([...JSON.parse(overrideConfig[fileInputField]), dataBase64String]) + } else { + overrideConfig[fileInputField] = JSON.stringify([dataBase64String]) + } + } + incomingInput = { + question: req.body.question ?? 'hello', + overrideConfig, + history: [], + stopNodeId: req.body.stopNodeId + } + } + + /*** Get chatflows and prepare data ***/ + const flowData = chatflow.flowData + const parsedFlowData: IReactFlowObject = JSON.parse(flowData) + const nodes = parsedFlowData.nodes + const edges = parsedFlowData.edges + + let stopNodeId = incomingInput?.stopNodeId ?? '' + let chatHistory = incomingInput?.history + let chatId = incomingInput.chatId ?? '' + let isUpsert = true + + const vsNodes = nodes.filter( + (node) => + node.data.category === 'Vector Stores' && + !node.data.label.includes('Upsert') && + !node.data.label.includes('Load Existing') + ) + if (vsNodes.length > 1 && !stopNodeId) { + return res.status(500).send('There are multiple vector nodes, please provide stopNodeId in body request') + } else if (vsNodes.length === 1 && !stopNodeId) { + stopNodeId = vsNodes[0].data.id + } + + const { graph } = constructGraphs(nodes, edges, { isReversed: true }) + + const nodeIds = getAllConnectedNodes(graph, stopNodeId) + + const filteredGraph: INodeDirectedGraph = {} + for (const key of nodeIds) { + if (Object.prototype.hasOwnProperty.call(graph, key)) { + filteredGraph[key] = graph[key] + } + } + + const { startingNodeIds, depthQueue } = getStartingNodes(filteredGraph, stopNodeId) + + await buildLangchain( + startingNodeIds, + nodes, + edges, + filteredGraph, + depthQueue, + this.nodesPool.componentNodes, + incomingInput.question, + chatHistory, + chatId, + chatflowid, + this.AppDataSource, + incomingInput?.overrideConfig, + this.cachePool, + isUpsert, + stopNodeId + ) + + const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.data.id)) + + this.chatflowPool.add(chatflowid, undefined, startingNodes, incomingInput?.overrideConfig) + return res.status(201).send('Successfully Upserted') + } catch (e: any) { + logger.error('[server]: Error:', e) + return res.status(500).send(e.message) + } + } + /** * Build Chatflow * @param {Request} req @@ -1381,7 +1485,7 @@ export class App { * @param {boolean} isInternal * @param {boolean} isUpsert */ - async buildChatflow(req: Request, res: Response, socketIO?: Server, isInternal: boolean = false, isUpsert: boolean = false) { + async buildChatflow(req: Request, res: Response, socketIO?: Server, isInternal: boolean = false) { try { const chatflowid = req.params.id let incomingInput: IncomingInput = req.body @@ -1422,8 +1526,7 @@ export class App { question: req.body.question ?? 'hello', overrideConfig, history: [], - socketIOClientId: req.body.socketIOClientId, - stopNodeId: req.body.stopNodeId + socketIOClientId: req.body.socketIOClientId } } @@ -1451,8 +1554,7 @@ export class App { this.chatflowPool.activeChatflows[chatflowid].overrideConfig, incomingInput.overrideConfig ) && - !isStartNodeDependOnInput(this.chatflowPool.activeChatflows[chatflowid].startingNodes, nodes) && - !isUpsert + !isStartNodeDependOnInput(this.chatflowPool.activeChatflows[chatflowid].startingNodes, nodes) ) } @@ -1481,8 +1583,7 @@ export class App { if ( endingNodeData.outputs && Object.keys(endingNodeData.outputs).length && - !Object.values(endingNodeData.outputs).includes(endingNodeData.name) && - !isUpsert + !Object.values(endingNodeData.outputs).includes(endingNodeData.name) ) { return res .status(500) @@ -1542,17 +1643,9 @@ export class App { chatflowid, this.AppDataSource, incomingInput?.overrideConfig, - this.cachePool, - isUpsert, - incomingInput.stopNodeId + this.cachePool ) - // If request is upsert, stop here - if (isUpsert) { - this.chatflowPool.add(chatflowid, undefined, startingNodes, incomingInput?.overrideConfig) - return res.status(201).send('Successfully Upserted') - } - const nodeToExecute = endingNodeIds.length === 1 ? reactFlowNodes.find((node: IReactFlowNode) => endingNodeIds[0] === node.id) diff --git a/packages/server/src/utils/index.ts b/packages/server/src/utils/index.ts index 49cc430f276..eec813d4fb5 100644 --- a/packages/server/src/utils/index.ts +++ b/packages/server/src/utils/index.ts @@ -308,10 +308,8 @@ export const buildLangchain = async ( if (overrideConfig) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig) const reactFlowNodeData: INodeData = resolveVariables(flowNodeData, flowNodes, question, chatHistory) - if ( - isUpsert && - ((stopNodeId && reactFlowNodeData.id === stopNodeId) || (!stopNodeId && reactFlowNodeData.category === 'Vector Stores')) - ) { + // TODO: Avoid processing Text Splitter + Doc Loader once Upsert & Load Existing Vector Nodes are deprecated + if (isUpsert && stopNodeId && nodeId === stopNodeId) { logger.debug(`[server]: Upserting ${reactFlowNode.data.label} (${reactFlowNode.data.id})`) await newNodeInstance.vectorStoreMethods!['upsert']!.call(newNodeInstance, reactFlowNodeData, { chatId, From 46571c796f652000ac4a7d851cef4420ad872735 Mon Sep 17 00:00:00 2001 From: Henry Heng Date: Wed, 20 Dec 2023 01:16:15 +0000 Subject: [PATCH 2/2] Update index.ts --- packages/server/src/index.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index f69148bbd58..135e14511b9 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -1434,6 +1434,8 @@ export class App { return res.status(500).send('There are multiple vector nodes, please provide stopNodeId in body request') } else if (vsNodes.length === 1 && !stopNodeId) { stopNodeId = vsNodes[0].data.id + } else if (!vsNodes.length && !stopNodeId) { + return res.status(500).send('No vector node found') } const { graph } = constructGraphs(nodes, edges, { isReversed: true })