From 8e5932d7d48b6632b77efff54a4ef6c4553c7cd4 Mon Sep 17 00:00:00 2001 From: Aric Lasry Date: Fri, 8 Sep 2023 13:08:43 +0200 Subject: [PATCH] Add APIs: POST /conv, POST conv/message, GET conv/sub --- front/lib/api/assistant/pubsub.ts | 37 +++--- front/lib/error.ts | 3 +- .../w/[wId]/assistant/[cId]/messages/index.ts | 109 ++++++++++++++++++ .../[wId]/assistant/conversation/[cId]/sub.ts | 87 ++++++++++++++ .../w/[wId]/assistant/conversation/index.ts | 73 ++++++++++++ 5 files changed, 292 insertions(+), 17 deletions(-) create mode 100644 front/pages/api/v1/w/[wId]/assistant/[cId]/messages/index.ts create mode 100644 front/pages/api/v1/w/[wId]/assistant/conversation/[cId]/sub.ts create mode 100644 front/pages/api/v1/w/[wId]/assistant/conversation/index.ts diff --git a/front/lib/api/assistant/pubsub.ts b/front/lib/api/assistant/pubsub.ts index 2f01642fd1a5..c65893a0482d 100644 --- a/front/lib/api/assistant/pubsub.ts +++ b/front/lib/api/assistant/pubsub.ts @@ -64,22 +64,27 @@ export async function* getConversationEvents( }> { const pubsubChannel = getConversationChannelId(conversationSID); const client = await redisClient(); - const events = await client.xRead( - { key: pubsubChannel, id: lastEventId ? lastEventId : "0-0" }, - { COUNT: 1, BLOCK: 10000 } - ); - if (!events) { - return; - } - for (const event of events) { - for (const message of event.messages) { - const payloadStr = message.message["payload"]; - const messageId = message.id; - const payload = JSON.parse(payloadStr); - yield { - eventId: messageId, - data: payload, - }; + + while (true) { + const events = await client.xRead( + { key: pubsubChannel, id: lastEventId ? lastEventId : "0-0" }, + { COUNT: 1, BLOCK: 10000 } + ); + if (!events) { + console.log("Nothing here", conversationSID, lastEventId); + return; + } + for (const event of events) { + for (const message of event.messages) { + const payloadStr = message.message["payload"]; + const messageId = message.id; + const payload = JSON.parse(payloadStr); + lastEventId = messageId; + yield { + eventId: messageId, + data: payload, + }; + } } } } diff --git a/front/lib/error.ts b/front/lib/error.ts index 62c19b39f30e..1094048be3af 100644 --- a/front/lib/error.ts +++ b/front/lib/error.ts @@ -41,7 +41,8 @@ export type APIErrorType = | "extracted_event_not_found" | "connector_update_error" | "connector_update_unauthorized" - | "connector_oauth_target_mismatch"; + | "connector_oauth_target_mismatch" + | "conversation_not_found"; export type APIError = { type: APIErrorType; diff --git a/front/pages/api/v1/w/[wId]/assistant/[cId]/messages/index.ts b/front/pages/api/v1/w/[wId]/assistant/[cId]/messages/index.ts new file mode 100644 index 000000000000..f97a764b7d3a --- /dev/null +++ b/front/pages/api/v1/w/[wId]/assistant/[cId]/messages/index.ts @@ -0,0 +1,109 @@ +import { NextApiRequest, NextApiResponse } from "next"; + +import { postUserMessageWithPubSub } from "@app/lib/api/assistant/pubsub"; +import { Authenticator, getAPIKey } from "@app/lib/auth"; +import { ReturnedAPIErrorType } from "@app/lib/error"; +import { Conversation } from "@app/lib/models"; +import { apiError, withLogging } from "@app/logger/withlogging"; + +async function handler( + req: NextApiRequest, + res: NextApiResponse +): Promise { + const keyRes = await getAPIKey(req); + if (keyRes.isErr()) { + return apiError(req, res, keyRes.error); + } + + const { auth, keyWorkspaceId } = await Authenticator.fromKey( + keyRes.value, + req.query.wId as string + ); + + if (!keyRes.value.isSystem) { + return apiError(req, res, { + status_code: 400, + api_error: { + type: "invalid_request_error", + message: + "The Assitant API is only accessible by system API Key. Ping us at team@dust.tt if you want access to it.", + }, + }); + } + + if (keyWorkspaceId !== req.query.wId) { + return apiError(req, res, { + status_code: 400, + api_error: { + type: "invalid_request_error", + message: "The Assistant API is only available on your own workspace.", + }, + }); + } + + const conv = await Conversation.findOne({ + where: { + sId: req.query.cId as string, + }, + }); + if (!conv) { + return apiError(req, res, { + status_code: 404, + api_error: { + type: "conversation_not_found", + message: "Conversation not found.", + }, + }); + } + + // no time for actual io-ts parsing right now, so here is the expected structure. + // Will handle proper parsing later. + const payload = req.body as { + message: string; + context: { + timezone: string; + username: string; + fullName: string; + email: string; + profilePictureUrl: string; + }; + }; + + switch (req.method) { + case "POST": + const p = postUserMessageWithPubSub(auth, { + conversation: { + id: conv.id, + created: conv.created.getTime(), + sId: conv.sId, + title: conv.title, + // not sure how to provide the content here for now. + content: [], + visibility: conv.visibility, + }, + message: payload.message, + mentions: [], + context: { + timezone: payload.context.timezone, + username: payload.context.username, + fullName: payload.context.fullName, + email: payload.context.email, + profilePictureUrl: payload.context.profilePictureUrl, + }, + }); + res.status(200).end(); + await p; + return; + + default: + return apiError(req, res, { + status_code: 405, + api_error: { + type: "method_not_supported_error", + message: "The method passed is not supported, POST is expected.", + }, + }); + } +} + +export default withLogging(handler); diff --git a/front/pages/api/v1/w/[wId]/assistant/conversation/[cId]/sub.ts b/front/pages/api/v1/w/[wId]/assistant/conversation/[cId]/sub.ts new file mode 100644 index 000000000000..9df46af800e9 --- /dev/null +++ b/front/pages/api/v1/w/[wId]/assistant/conversation/[cId]/sub.ts @@ -0,0 +1,87 @@ +import { NextApiRequest, NextApiResponse } from "next"; + +import { getConversationEvents } from "@app/lib/api/assistant/pubsub"; +import { Authenticator, getAPIKey } from "@app/lib/auth"; +import { ReturnedAPIErrorType } from "@app/lib/error"; +import { Conversation } from "@app/lib/models"; +import { apiError, withLogging } from "@app/logger/withlogging"; + +async function handler( + req: NextApiRequest, + res: NextApiResponse +): Promise { + const keyRes = await getAPIKey(req); + if (keyRes.isErr()) { + return apiError(req, res, keyRes.error); + } + + if (!keyRes.value.isSystem) { + return apiError(req, res, { + status_code: 400, + api_error: { + type: "invalid_request_error", + message: + "The Assitant API is only accessible by system API Key. Ping us at team@dust.tt if you want access to it.", + }, + }); + } + const { keyWorkspaceId } = await Authenticator.fromKey( + keyRes.value, + req.query.wId as string + ); + + if (keyWorkspaceId !== req.query.wId) { + return apiError(req, res, { + status_code: 400, + api_error: { + type: "invalid_request_error", + message: "The Assistant API is only available on your own workspace.", + }, + }); + } + + const conv = await Conversation.findOne({ + where: { + sId: req.query.cId as string, + }, + }); + if (!conv) { + return apiError(req, res, { + status_code: 404, + api_error: { + type: "conversation_not_found", + message: "Conversation not found.", + }, + }); + } + + switch (req.method) { + case "GET": { + res.writeHead(200, { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }); + + for await (const event of getConversationEvents(conv.sId, null)) { + res.write(event); + // @ts-expect-error we need to flush for streaming but TS thinks flush() does not exists. + res.flush(); + } + + res.end(); + return; + } + + default: + return apiError(req, res, { + status_code: 405, + api_error: { + type: "method_not_supported_error", + message: "The method passed is not supported, POST is expected.", + }, + }); + } +} + +export default withLogging(handler); diff --git a/front/pages/api/v1/w/[wId]/assistant/conversation/index.ts b/front/pages/api/v1/w/[wId]/assistant/conversation/index.ts new file mode 100644 index 000000000000..fb9075d9321b --- /dev/null +++ b/front/pages/api/v1/w/[wId]/assistant/conversation/index.ts @@ -0,0 +1,73 @@ +import { NextApiRequest, NextApiResponse } from "next"; + +import { Authenticator, getAPIKey } from "@app/lib/auth"; +import { ReturnedAPIErrorType } from "@app/lib/error"; +import { Conversation } from "@app/lib/models"; +import { generateModelSId } from "@app/lib/utils"; +import { apiError, withLogging } from "@app/logger/withlogging"; +import { ConversationType } from "@app/types/assistant/conversation"; + +async function handler( + req: NextApiRequest, + res: NextApiResponse +): Promise { + const keyRes = await getAPIKey(req); + if (keyRes.isErr()) { + return apiError(req, res, keyRes.error); + } + + const { auth, keyWorkspaceId } = await Authenticator.fromKey( + keyRes.value, + req.query.wId as string + ); + + if (!keyRes.value.isSystem) { + return apiError(req, res, { + status_code: 400, + api_error: { + type: "invalid_request_error", + message: + "The Assitant API is only accessible by system API Key. Ping us at team@dust.tt if you want access to it.", + }, + }); + } + + if (keyWorkspaceId !== req.query.wId) { + return apiError(req, res, { + status_code: 400, + api_error: { + type: "invalid_request_error", + message: "The Assistant API is only available on your own workspace.", + }, + }); + } + + switch (req.method) { + case "POST": + const conv = await Conversation.create({ + sId: generateModelSId(), + title: req.body.title, + created: new Date(), + visibility: req.body.visibility, + }); + return res.status(200).json({ + id: conv.id, + created: conv.created.getTime(), + sId: conv.sId, + title: conv.title, + visibility: conv.visibility, + content: [], + }); + + default: + return apiError(req, res, { + status_code: 405, + api_error: { + type: "method_not_supported_error", + message: "The method passed is not supported, POST is expected.", + }, + }); + } +} + +export default withLogging(handler);