Skip to content

Commit

Permalink
Making sub works
Browse files Browse the repository at this point in the history
  • Loading branch information
lasryaric committed Sep 8, 2023
1 parent 8e5932d commit 0015fa6
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 10 deletions.
36 changes: 27 additions & 9 deletions front/lib/api/assistant/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,39 @@ import {
postUserMessage,
UserMessageNewEvent,
} from "@app/lib/api/assistant/conversation";
import { Authenticator } from "@app/lib/auth";
import { redisClient } from "@app/lib/redis";

type PostUserMessageParams = Parameters<typeof postUserMessage>;
import {
ConversationType,
Mention,
UserMessageContext,
} from "@app/types/assistant/conversation";

export async function postUserMessageWithPubSub(
...args: PostUserMessageParams
auth: Authenticator,
{
conversation,
message,
mentions,
context,
}: {
conversation: ConversationType;
message: string;
mentions: Mention[];
context: UserMessageContext;
}
) {
const client = await redisClient();
for await (const event of postUserMessage(...args)) {
for await (const event of postUserMessage(auth, {
conversation,
message,
mentions,
context,
})) {
switch (event.type) {
case "user_message_new":
case "agent_message_new": {
const pubsubChannel = getConversationChannelId(
args[1].conversation.sId
);
const pubsubChannel = getConversationChannelId(conversation.sId);
await client.xAdd(pubsubChannel, "*", {
payload: JSON.stringify(event),
});
Expand Down Expand Up @@ -68,10 +86,10 @@ export async function* getConversationEvents(
while (true) {
const events = await client.xRead(
{ key: pubsubChannel, id: lastEventId ? lastEventId : "0-0" },
{ COUNT: 1, BLOCK: 10000 }
// weird, xread does not return on new message when count is = 1. Anything over 1 works.
{ COUNT: 10, BLOCK: 10000 }
);
if (!events) {
console.log("Nothing here", conversationSID, lastEventId);
return;
}
for (const event of events) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async function handler(
});

for await (const event of getConversationEvents(conv.sId, null)) {
res.write(event);
res.write(JSON.stringify(event));
// @ts-expect-error we need to flush for streaming but TS thinks flush() does not exists.
res.flush();
}
Expand Down

0 comments on commit 0015fa6

Please sign in to comment.