Skip to content

Commit

Permalink
Merge pull request FlowiseAI#1414 from FlowiseAI/bugfix/Multiple-Upsert
Browse files Browse the repository at this point in the history
Bugfix/Multiple Vector Upsert
  • Loading branch information
HenryHengZJ authored Dec 20, 2023
2 parents c636a9c + 46571c7 commit ab24524
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 29 deletions.
6 changes: 3 additions & 3 deletions packages/components/nodes/vectorstores/Redis/Redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ class Redis_VectorStores implements INode {
}
}

const redisClient = createClient({ url: redisUrl })
await redisClient.connect()

try {
const redisClient = createClient({ url: redisUrl })
await redisClient.connect()

const storeConfig: RedisVectorStoreConfig = {
redisClient: redisClient,
indexName: indexName
Expand Down
4 changes: 2 additions & 2 deletions packages/server/marketplaces/chatflows/Multiple VectorDB.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
"inputs": {
"name": "ai-paper-qa",
"description": "AI Paper QA - useful for when you need to ask questions about the AI-Generated Content paper.",
"returnDirect": "",
"returnDirect": true,
"baseChain": "{{retrievalQAChain_0.data.instance}}"
},
"outputAnchors": [
Expand Down Expand Up @@ -128,7 +128,7 @@
"inputs": {
"name": "state-of-union-qa",
"description": "State of the Union QA - useful for when you need to ask questions about the president speech and most recent state of the union address.",
"returnDirect": "",
"returnDirect": true,
"baseChain": "{{retrievalQAChain_1.data.instance}}"
},
"outputAnchors": [
Expand Down
135 changes: 115 additions & 20 deletions packages/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import {
chatType,
IChatMessage,
IReactFlowEdge,
IDepthQueue
IDepthQueue,
INodeDirectedGraph
} from './Interface'
import {
getNodeModulesPackagePath,
Expand All @@ -44,7 +45,8 @@ import {
checkMemorySessionId,
clearSessionMemoryFromViewMessageDialog,
getUserHome,
replaceChatHistory
replaceChatHistory,
getAllConnectedNodes
} from './utils'
import { cloneDeep, omit, uniqWith, isEqual } from 'lodash'
import { getDataSource } from './DataSource'
Expand Down Expand Up @@ -1091,12 +1093,12 @@ export class App {
upload.array('files'),
(req: Request, res: Response, next: NextFunction) => getRateLimiter(req, res, next),
async (req: Request, res: Response) => {
await this.buildChatflow(req, res, undefined, false, true)
await this.upsertVector(req, res)
}
)

this.app.post('/api/v1/vector/internal-upsert/:id', async (req: Request, res: Response) => {
await this.buildChatflow(req, res, undefined, true, true)
await this.upsertVector(req, res, true)
})

// ----------------------------------------
Expand Down Expand Up @@ -1373,6 +1375,110 @@ export class App {
return undefined
}

async upsertVector(req: Request, res: Response, isInternal: boolean = false) {
try {
const chatflowid = req.params.id
let incomingInput: IncomingInput = req.body

const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({
id: chatflowid
})
if (!chatflow) return res.status(404).send(`Chatflow ${chatflowid} not found`)

if (!isInternal) {
const isKeyValidated = await this.validateKey(req, chatflow)
if (!isKeyValidated) return res.status(401).send('Unauthorized')
}

const files = (req.files as any[]) || []

if (files.length) {
const overrideConfig: ICommonObject = { ...req.body }
for (const file of files) {
const fileData = fs.readFileSync(file.path, { encoding: 'base64' })
const dataBase64String = `data:${file.mimetype};base64,${fileData},filename:${file.filename}`

const fileInputField = mapMimeTypeToInputField(file.mimetype)
if (overrideConfig[fileInputField]) {
overrideConfig[fileInputField] = JSON.stringify([...JSON.parse(overrideConfig[fileInputField]), dataBase64String])
} else {
overrideConfig[fileInputField] = JSON.stringify([dataBase64String])
}
}
incomingInput = {
question: req.body.question ?? 'hello',
overrideConfig,
history: [],
stopNodeId: req.body.stopNodeId
}
}

/*** Get chatflows and prepare data ***/
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges

let stopNodeId = incomingInput?.stopNodeId ?? ''
let chatHistory = incomingInput?.history
let chatId = incomingInput.chatId ?? ''
let isUpsert = true

const vsNodes = nodes.filter(
(node) =>
node.data.category === 'Vector Stores' &&
!node.data.label.includes('Upsert') &&
!node.data.label.includes('Load Existing')
)
if (vsNodes.length > 1 && !stopNodeId) {
return res.status(500).send('There are multiple vector nodes, please provide stopNodeId in body request')
} else if (vsNodes.length === 1 && !stopNodeId) {
stopNodeId = vsNodes[0].data.id
} else if (!vsNodes.length && !stopNodeId) {
return res.status(500).send('No vector node found')
}

const { graph } = constructGraphs(nodes, edges, { isReversed: true })

const nodeIds = getAllConnectedNodes(graph, stopNodeId)

const filteredGraph: INodeDirectedGraph = {}
for (const key of nodeIds) {
if (Object.prototype.hasOwnProperty.call(graph, key)) {
filteredGraph[key] = graph[key]
}
}

const { startingNodeIds, depthQueue } = getStartingNodes(filteredGraph, stopNodeId)

await buildLangchain(
startingNodeIds,
nodes,
edges,
filteredGraph,
depthQueue,
this.nodesPool.componentNodes,
incomingInput.question,
chatHistory,
chatId,
chatflowid,
this.AppDataSource,
incomingInput?.overrideConfig,
this.cachePool,
isUpsert,
stopNodeId
)

const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.data.id))

this.chatflowPool.add(chatflowid, undefined, startingNodes, incomingInput?.overrideConfig)
return res.status(201).send('Successfully Upserted')
} catch (e: any) {
logger.error('[server]: Error:', e)
return res.status(500).send(e.message)
}
}

/**
* Build Chatflow
* @param {Request} req
Expand All @@ -1381,7 +1487,7 @@ export class App {
* @param {boolean} isInternal
* @param {boolean} isUpsert
*/
async buildChatflow(req: Request, res: Response, socketIO?: Server, isInternal: boolean = false, isUpsert: boolean = false) {
async buildChatflow(req: Request, res: Response, socketIO?: Server, isInternal: boolean = false) {
try {
const chatflowid = req.params.id
let incomingInput: IncomingInput = req.body
Expand Down Expand Up @@ -1422,8 +1528,7 @@ export class App {
question: req.body.question ?? 'hello',
overrideConfig,
history: [],
socketIOClientId: req.body.socketIOClientId,
stopNodeId: req.body.stopNodeId
socketIOClientId: req.body.socketIOClientId
}
}

Expand Down Expand Up @@ -1451,8 +1556,7 @@ export class App {
this.chatflowPool.activeChatflows[chatflowid].overrideConfig,
incomingInput.overrideConfig
) &&
!isStartNodeDependOnInput(this.chatflowPool.activeChatflows[chatflowid].startingNodes, nodes) &&
!isUpsert
!isStartNodeDependOnInput(this.chatflowPool.activeChatflows[chatflowid].startingNodes, nodes)
)
}

Expand Down Expand Up @@ -1481,8 +1585,7 @@ export class App {
if (
endingNodeData.outputs &&
Object.keys(endingNodeData.outputs).length &&
!Object.values(endingNodeData.outputs).includes(endingNodeData.name) &&
!isUpsert
!Object.values(endingNodeData.outputs).includes(endingNodeData.name)
) {
return res
.status(500)
Expand Down Expand Up @@ -1542,17 +1645,9 @@ export class App {
chatflowid,
this.AppDataSource,
incomingInput?.overrideConfig,
this.cachePool,
isUpsert,
incomingInput.stopNodeId
this.cachePool
)

// If request is upsert, stop here
if (isUpsert) {
this.chatflowPool.add(chatflowid, undefined, startingNodes, incomingInput?.overrideConfig)
return res.status(201).send('Successfully Upserted')
}

const nodeToExecute =
endingNodeIds.length === 1
? reactFlowNodes.find((node: IReactFlowNode) => endingNodeIds[0] === node.id)
Expand Down
6 changes: 2 additions & 4 deletions packages/server/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,8 @@ export const buildLangchain = async (
if (overrideConfig) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig)
const reactFlowNodeData: INodeData = resolveVariables(flowNodeData, flowNodes, question, chatHistory)

if (
isUpsert &&
((stopNodeId && reactFlowNodeData.id === stopNodeId) || (!stopNodeId && reactFlowNodeData.category === 'Vector Stores'))
) {
// TODO: Avoid processing Text Splitter + Doc Loader once Upsert & Load Existing Vector Nodes are deprecated
if (isUpsert && stopNodeId && nodeId === stopNodeId) {
logger.debug(`[server]: Upserting ${reactFlowNode.data.label} (${reactFlowNode.data.id})`)
await newNodeInstance.vectorStoreMethods!['upsert']!.call(newNodeInstance, reactFlowNodeData, {
chatId,
Expand Down

0 comments on commit ab24524

Please sign in to comment.