Skip to content

Commit

Permalink
[WIP] First basic draft of the dispatch system using Redis
Browse files Browse the repository at this point in the history
  • Loading branch information
lasryaric committed Sep 8, 2023
1 parent b0f480c commit 9f31ebb
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 1 deletion.
7 changes: 6 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ services:
qdrant:
image: qdrant/qdrant
volumes:
- dustvolume:/qdrant
- dustvolume:/qdrant
ports:
- 6334:6334
- 6333:6333
redis:
image: redis
ports:
- 6379:6379


volumes:
Expand Down
127 changes: 127 additions & 0 deletions front/lib/api/assistant/pubsub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import {
AgentActionEvent,
AgentActionSuccessEvent,
AgentErrorEvent,
AgentGenerationSuccessEvent,
AgentGenerationTokensEvent,
AgentMessageNewEvent,
} from "@app/lib/api/assistant/agent";
import {
postUserMessage,
UserMessageNewEvent,
} from "@app/lib/api/assistant/conversation";
import { redisClient } from "@app/lib/redis";

type PostUserMessageParams = Parameters<typeof postUserMessage>;

export async function postUserMessageWithPubSub(
...args: PostUserMessageParams
) {
const client = await redisClient();
for await (const event of postUserMessage(...args)) {
switch (event.type) {
case "user_message_new":
case "agent_message_new": {
const pubsubChannel = getConversationChannelId(
args[1].conversation.sId
);
await client.xAdd(pubsubChannel, "*", {
payload: JSON.stringify(event),
});
break;
}
// missing retrieval_documents because it does not have a messageId field.
case "agent_generation_tokens":
case "agent_error":
case "agent_action_success":
case "retrieval_documents": {
const pubsubChannel = getMessageChannelId(event.messageId);
await client.xAdd(pubsubChannel, "*", {
payload: JSON.stringify(event),
});
break;
}
case "agent_generation_success": {
const pubsubChannel = getMessageChannelId(event.message.sId);
await client.xAdd(pubsubChannel, "*", {
payload: JSON.stringify(event),
});
break;
}

default:
throw new Error(`Unhandled event. ${event.type}`);
}
}
}

export async function* getConversationEvents(
conversationSID: string,
lastEventId: string | null
): AsyncGenerator<{
eventId: string;
data: UserMessageNewEvent | AgentMessageNewEvent;
}> {
const pubsubChannel = getConversationChannelId(conversationSID);
const client = await redisClient();
const events = await client.xRead(
{ key: pubsubChannel, id: lastEventId ? lastEventId : "0-0" },
{ COUNT: 1, BLOCK: 10000 }
);
if (!events) {
return;
}
for (const event of events) {
for (const message of event.messages) {
const payloadStr = message.message["payload"];
const messageId = message.id;
const payload = JSON.parse(payloadStr);
yield {
eventId: messageId,
data: payload,
};
}
}
}

export async function* getMessagesEvents(
messageSID: string,
lastEventId: string | null
): AsyncGenerator<{
eventId: string;
data:
| AgentErrorEvent
| AgentActionEvent
| AgentActionSuccessEvent
| AgentGenerationTokensEvent
| AgentGenerationSuccessEvent;
}> {
const pubsubChannel = getMessageChannelId(messageSID);
const client = await redisClient();
const events = await client.xRead(
{ key: pubsubChannel, id: lastEventId ? lastEventId : "0-0" },
{ COUNT: 1, BLOCK: 10000 }
);
if (!events) {
return;
}
for (const event of events) {
for (const message of event.messages) {
const payloadStr = message.message["payload"];
const messageId = message.id;
const payload = JSON.parse(payloadStr);
yield {
eventId: messageId,
data: payload,
};
}
}
}

function getConversationChannelId(channelId: string) {
return `conversation-${channelId}`;
}

function getMessageChannelId(messageId: string) {
return `message-${messageId}`;
}
16 changes: 16 additions & 0 deletions front/lib/redis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { createClient } from "redis";

let REDIS_CLIENT: ReturnType<typeof createClient> | undefined = undefined;

export async function redisClient() {
if (REDIS_CLIENT) {
return REDIS_CLIENT;
}
const client = createClient();
client.on("error", (err) => console.log("Redis Client Error", err));

await client.connect();
REDIS_CLIENT = client;

return REDIS_CLIENT;
}
83 changes: 83 additions & 0 deletions front/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions front/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"react-markdown": "^8.0.7",
"react-p5": "^1.3.35",
"react-textarea-autosize": "^8.4.0",
"redis": "^4.6.8",
"remark-gfm": "^3.0.1",
"sequelize": "^6.31.0",
"showdown": "^2.1.0",
Expand Down

0 comments on commit 9f31ebb

Please sign in to comment.