diff --git a/docker-compose.yml b/docker-compose.yml index 514308639d09b..46eb486255154 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,9 +13,14 @@ services: qdrant: image: qdrant/qdrant volumes: - - dustvolume:/qdrant + - dustvolume:/qdrant ports: - 6334:6334 + - 6333:6333 + redis: + image: redis + ports: + - 6379:6379 volumes: diff --git a/front/lib/api/assistant/pubsub.ts b/front/lib/api/assistant/pubsub.ts new file mode 100644 index 0000000000000..2f01642fd1a54 --- /dev/null +++ b/front/lib/api/assistant/pubsub.ts @@ -0,0 +1,127 @@ +import { + AgentActionEvent, + AgentActionSuccessEvent, + AgentErrorEvent, + AgentGenerationSuccessEvent, + AgentGenerationTokensEvent, + AgentMessageNewEvent, +} from "@app/lib/api/assistant/agent"; +import { + postUserMessage, + UserMessageNewEvent, +} from "@app/lib/api/assistant/conversation"; +import { redisClient } from "@app/lib/redis"; + +type PostUserMessageParams = Parameters; + +export async function postUserMessageWithPubSub( + ...args: PostUserMessageParams +) { + const client = await redisClient(); + for await (const event of postUserMessage(...args)) { + switch (event.type) { + case "user_message_new": + case "agent_message_new": { + const pubsubChannel = getConversationChannelId( + args[1].conversation.sId + ); + await client.xAdd(pubsubChannel, "*", { + payload: JSON.stringify(event), + }); + break; + } + // missing retrieval_documents because it does not have a messageId field. + case "agent_generation_tokens": + case "agent_error": + case "agent_action_success": + case "retrieval_documents": { + const pubsubChannel = getMessageChannelId(event.messageId); + await client.xAdd(pubsubChannel, "*", { + payload: JSON.stringify(event), + }); + break; + } + case "agent_generation_success": { + const pubsubChannel = getMessageChannelId(event.message.sId); + await client.xAdd(pubsubChannel, "*", { + payload: JSON.stringify(event), + }); + break; + } + + default: + throw new Error(`Unhandled event. ${event.type}`); + } + } +} + +export async function* getConversationEvents( + conversationSID: string, + lastEventId: string | null +): AsyncGenerator<{ + eventId: string; + data: UserMessageNewEvent | AgentMessageNewEvent; +}> { + const pubsubChannel = getConversationChannelId(conversationSID); + const client = await redisClient(); + const events = await client.xRead( + { key: pubsubChannel, id: lastEventId ? lastEventId : "0-0" }, + { COUNT: 1, BLOCK: 10000 } + ); + if (!events) { + return; + } + for (const event of events) { + for (const message of event.messages) { + const payloadStr = message.message["payload"]; + const messageId = message.id; + const payload = JSON.parse(payloadStr); + yield { + eventId: messageId, + data: payload, + }; + } + } +} + +export async function* getMessagesEvents( + messageSID: string, + lastEventId: string | null +): AsyncGenerator<{ + eventId: string; + data: + | AgentErrorEvent + | AgentActionEvent + | AgentActionSuccessEvent + | AgentGenerationTokensEvent + | AgentGenerationSuccessEvent; +}> { + const pubsubChannel = getMessageChannelId(messageSID); + const client = await redisClient(); + const events = await client.xRead( + { key: pubsubChannel, id: lastEventId ? lastEventId : "0-0" }, + { COUNT: 1, BLOCK: 10000 } + ); + if (!events) { + return; + } + for (const event of events) { + for (const message of event.messages) { + const payloadStr = message.message["payload"]; + const messageId = message.id; + const payload = JSON.parse(payloadStr); + yield { + eventId: messageId, + data: payload, + }; + } + } +} + +function getConversationChannelId(channelId: string) { + return `conversation-${channelId}`; +} + +function getMessageChannelId(messageId: string) { + return `message-${messageId}`; +} diff --git a/front/lib/redis.ts b/front/lib/redis.ts new file mode 100644 index 0000000000000..abefbbfb67999 --- /dev/null +++ b/front/lib/redis.ts @@ -0,0 +1,16 @@ +import { createClient } from "redis"; + +let REDIS_CLIENT: ReturnType | undefined = undefined; + +export async function redisClient() { + if (REDIS_CLIENT) { + return REDIS_CLIENT; + } + const client = createClient(); + client.on("error", (err) => console.log("Redis Client Error", err)); + + await client.connect(); + REDIS_CLIENT = client; + + return REDIS_CLIENT; +} diff --git a/front/package-lock.json b/front/package-lock.json index 3bdf4c9da9ca0..6b7b5c7cef771 100644 --- a/front/package-lock.json +++ b/front/package-lock.json @@ -48,6 +48,7 @@ "react-markdown": "^8.0.7", "react-p5": "^1.3.35", "react-textarea-autosize": "^8.4.0", + "redis": "^4.6.8", "remark-gfm": "^3.0.1", "sequelize": "^6.31.0", "showdown": "^2.1.0", @@ -2133,6 +2134,59 @@ "version": "1.1.0", "license": "BSD-3-Clause" }, + "node_modules/@redis/bloom": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@redis/bloom/-/bloom-1.2.0.tgz", + "integrity": "sha512-HG2DFjYKbpNmVXsa0keLHp/3leGJz1mjh09f2RLGGLQZzSHpkmZWuwJbAvo3QcRY8p80m5+ZdXZdYOSBLlp7Cg==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/client": { + "version": "1.5.9", + "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.5.9.tgz", + "integrity": "sha512-SffgN+P1zdWJWSXBvJeynvEnmnZrYmtKSRW00xl8pOPFOMJjxRR9u0frSxJpPR6Y4V+k54blJjGW7FgxbTI7bQ==", + "dependencies": { + "cluster-key-slot": "1.1.2", + "generic-pool": "3.9.0", + "yallist": "4.0.0" + }, + "engines": { + "node": ">=14" + } + }, + "node_modules/@redis/graph": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@redis/graph/-/graph-1.1.0.tgz", + "integrity": "sha512-16yZWngxyXPd+MJxeSr0dqh2AIOi8j9yXKcKCwVaKDbH3HTuETpDVPcLujhFYVPtYrngSco31BUcSa9TH31Gqg==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/json": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/@redis/json/-/json-1.0.4.tgz", + "integrity": "sha512-LUZE2Gdrhg0Rx7AN+cZkb1e6HjoSKaeeW8rYnt89Tly13GBI5eP4CwDVr+MY8BAYfCg4/N15OUrtLoona9uSgw==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/search": { + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/@redis/search/-/search-1.1.3.tgz", + "integrity": "sha512-4Dg1JjvCevdiCBTZqjhKkGoC5/BcB7k9j99kdMnaXFXg8x4eyOIVg9487CMv7/BUVkFLZCaIh8ead9mU15DNng==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/time-series": { + "version": "1.0.5", + "resolved": "https://registry.npmjs.org/@redis/time-series/-/time-series-1.0.5.tgz", + "integrity": "sha512-IFjIgTusQym2B5IZJG3XKr5llka7ey84fw/NOYqESP5WUfQs9zz1ww/9+qoz4ka/S6KcGBodzlCeZ5UImKbscg==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, "node_modules/@rushstack/eslint-patch": { "version": "1.3.2", "dev": true, @@ -4118,6 +4172,14 @@ "node": ">=6" } }, + "node_modules/cluster-key-slot": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/co": { "version": "4.6.0", "dev": true, @@ -5962,6 +6024,14 @@ "node": ">=12" } }, + "node_modules/generic-pool": { + "version": "3.9.0", + "resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.9.0.tgz", + "integrity": "sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==", + "engines": { + "node": ">= 4" + } + }, "node_modules/gensync": { "version": "1.0.0-beta.2", "license": "MIT", @@ -10666,6 +10736,19 @@ "node": ">= 12.13.0" } }, + "node_modules/redis": { + "version": "4.6.8", + "resolved": "https://registry.npmjs.org/redis/-/redis-4.6.8.tgz", + "integrity": "sha512-S7qNkPUYrsofQ0ztWlTHSaK0Qqfl1y+WMIxrzeAGNG+9iUZB4HGeBgkHxE6uJJ6iXrkvLd1RVJ2nvu6H1sAzfQ==", + "dependencies": { + "@redis/bloom": "1.2.0", + "@redis/client": "1.5.9", + "@redis/graph": "1.1.0", + "@redis/json": "1.0.4", + "@redis/search": "1.1.3", + "@redis/time-series": "1.0.5" + } + }, "node_modules/refractor": { "version": "4.8.1", "license": "MIT", diff --git a/front/package.json b/front/package.json index 832ac17efe7b4..f03c971ef856f 100644 --- a/front/package.json +++ b/front/package.json @@ -56,6 +56,7 @@ "react-markdown": "^8.0.7", "react-p5": "^1.3.35", "react-textarea-autosize": "^8.4.0", + "redis": "^4.6.8", "remark-gfm": "^3.0.1", "sequelize": "^6.31.0", "showdown": "^2.1.0",