Skip to content

Commit

Permalink
[WIP] First basic draft of the dispatch system using Redis (#1310)
Browse files Browse the repository at this point in the history
* [WIP] First basic draft of the dispatch system using Redis

* Add APIs: POST /conv, POST conv/message, GET conv/sub

* Making sub works

* Clean up docker compose

* Addressing @spolu's comments

* Clean up after rebasing

* One redis client per connection
  • Loading branch information
lasryaric authored Sep 8, 2023
1 parent ccb115e commit cc00fae
Show file tree
Hide file tree
Showing 9 changed files with 535 additions and 1 deletion.
5 changes: 5 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ services:
- dustvolume:/qdrant
ports:
- 6334:6334
- 6333:6333
redis:
image: redis
ports:
- 6379:6379


volumes:
Expand Down
158 changes: 158 additions & 0 deletions front/lib/api/assistant/pubsub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import {
AgentActionEvent,
AgentActionSuccessEvent,
AgentErrorEvent,
AgentGenerationSuccessEvent,
} from "@app/lib/api/assistant/agent";
import {
AgentMessageNewEvent,
postUserMessage,
UserMessageNewEvent,
} from "@app/lib/api/assistant/conversation";
import { GenerationTokensEvent } from "@app/lib/api/assistant/generation";
import { Authenticator } from "@app/lib/auth";
import { redisClient } from "@app/lib/redis";
import logger from "@app/logger/logger";
import {
ConversationType,
Mention,
UserMessageContext,
} from "@app/types/assistant/conversation";

export async function postUserMessageWithPubSub(
auth: Authenticator,
{
conversation,
message,
mentions,
context,
}: {
conversation: ConversationType;
message: string;
mentions: Mention[];
context: UserMessageContext;
}
) {
const redis = await redisClient();
try {
for await (const event of postUserMessage(auth, {
conversation,
message,
mentions,
context,
})) {
switch (event.type) {
case "user_message_new":
case "agent_message_new": {
const pubsubChannel = getConversationChannelId(conversation.sId);
await redis.xAdd(pubsubChannel, "*", {
payload: JSON.stringify(event),
});
break;
}
case "retrieval_params":
case "agent_error":
case "agent_action_success":
case "retrieval_documents":
case "generation_tokens":
case "agent_generation_success":
case "agent_message_success": {
const pubsubChannel = getMessageChannelId(event.messageId);
await redis.xAdd(pubsubChannel, "*", {
payload: JSON.stringify(event),
});
break;
}

default:
((blockParent: never) => {
logger.error("Unknown event type", blockParent);
})(event);
return null;
}
}
} finally {
await redis.quit();
}
console.log("exiting postUserMessageWithPubSub", message, conversation.sId);
}

export async function* getConversationEvents(
conversationId: string,
lastEventId: string | null
): AsyncGenerator<{
eventId: string;
data: UserMessageNewEvent | AgentMessageNewEvent;
}> {
const redis = await redisClient();
const pubsubChannel = getConversationChannelId(conversationId);

try {
while (true) {
const events = await redis.xRead(
{ key: pubsubChannel, id: lastEventId ? lastEventId : "0-0" },
// weird, xread does not return on new message when count is = 1. Anything over 1 works.
{ COUNT: 1, BLOCK: 60 * 1000 }
);
if (!events) {
return;
}
for (const event of events) {
for (const message of event.messages) {
const payloadStr = message.message["payload"];
const messageId = message.id;
const payload = JSON.parse(payloadStr);
lastEventId = messageId;
yield {
eventId: messageId,
data: payload,
};
}
}
}
} finally {
await redis.quit();
}
}

export async function* getMessagesEvents(
messageId: string,
lastEventId: string | null
): AsyncGenerator<{
eventId: string;
data:
| AgentErrorEvent
| AgentActionEvent
| AgentActionSuccessEvent
| GenerationTokensEvent
| AgentGenerationSuccessEvent;
}> {
const pubsubChannel = getMessageChannelId(messageId);
const client = await redisClient();
const events = await client.xRead(
{ key: pubsubChannel, id: lastEventId ? lastEventId : "0-0" },
{ COUNT: 1, BLOCK: 60 * 1000 }
);
if (!events) {
return;
}
for (const event of events) {
for (const message of event.messages) {
const payloadStr = message.message["payload"];
const messageId = message.id;
const payload = JSON.parse(payloadStr);
yield {
eventId: messageId,
data: payload,
};
}
}
}

function getConversationChannelId(channelId: string) {
return `conversation-${channelId}`;
}

function getMessageChannelId(messageId: string) {
return `message-${messageId}`;
}
3 changes: 2 additions & 1 deletion front/lib/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ export type APIErrorType =
| "extracted_event_not_found"
| "connector_update_error"
| "connector_update_unauthorized"
| "connector_oauth_target_mismatch";
| "connector_oauth_target_mismatch"
| "conversation_not_found";

export type APIError = {
type: APIErrorType;
Expand Down
16 changes: 16 additions & 0 deletions front/lib/redis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { createClient } from "redis";

export async function redisClient() {
const { REDIS_URI } = process.env;
if (!REDIS_URI) {
throw new Error("REDIS_URI is not defined");
}
const client = createClient({
url: REDIS_URI,
});
client.on("error", (err) => console.log("Redis Client Error", err));

await client.connect();

return client;
}
83 changes: 83 additions & 0 deletions front/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions front/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"react-markdown": "^8.0.7",
"react-p5": "^1.3.35",
"react-textarea-autosize": "^8.4.0",
"redis": "^4.6.8",
"remark-gfm": "^3.0.1",
"sequelize": "^6.31.0",
"showdown": "^2.1.0",
Expand Down
Loading

0 comments on commit cc00fae

Please sign in to comment.