Skip to content

Commit

Permalink
Add APIs: POST /conv, POST conv/message, GET conv/sub
Browse files Browse the repository at this point in the history
  • Loading branch information
lasryaric committed Sep 8, 2023
1 parent 9f31ebb commit 8e5932d
Show file tree
Hide file tree
Showing 5 changed files with 292 additions and 17 deletions.
37 changes: 21 additions & 16 deletions front/lib/api/assistant/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,27 @@ export async function* getConversationEvents(
}> {
const pubsubChannel = getConversationChannelId(conversationSID);
const client = await redisClient();
const events = await client.xRead(
{ key: pubsubChannel, id: lastEventId ? lastEventId : "0-0" },
{ COUNT: 1, BLOCK: 10000 }
);
if (!events) {
return;
}
for (const event of events) {
for (const message of event.messages) {
const payloadStr = message.message["payload"];
const messageId = message.id;
const payload = JSON.parse(payloadStr);
yield {
eventId: messageId,
data: payload,
};

while (true) {
const events = await client.xRead(
{ key: pubsubChannel, id: lastEventId ? lastEventId : "0-0" },
{ COUNT: 1, BLOCK: 10000 }
);
if (!events) {
console.log("Nothing here", conversationSID, lastEventId);
return;
}
for (const event of events) {
for (const message of event.messages) {
const payloadStr = message.message["payload"];
const messageId = message.id;
const payload = JSON.parse(payloadStr);
lastEventId = messageId;
yield {
eventId: messageId,
data: payload,
};
}
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion front/lib/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ export type APIErrorType =
| "extracted_event_not_found"
| "connector_update_error"
| "connector_update_unauthorized"
| "connector_oauth_target_mismatch";
| "connector_oauth_target_mismatch"
| "conversation_not_found";

export type APIError = {
type: APIErrorType;
Expand Down
109 changes: 109 additions & 0 deletions front/pages/api/v1/w/[wId]/assistant/[cId]/messages/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import { NextApiRequest, NextApiResponse } from "next";

import { postUserMessageWithPubSub } from "@app/lib/api/assistant/pubsub";
import { Authenticator, getAPIKey } from "@app/lib/auth";
import { ReturnedAPIErrorType } from "@app/lib/error";
import { Conversation } from "@app/lib/models";
import { apiError, withLogging } from "@app/logger/withlogging";

async function handler(
req: NextApiRequest,
res: NextApiResponse<ReturnedAPIErrorType>
): Promise<void> {
const keyRes = await getAPIKey(req);
if (keyRes.isErr()) {
return apiError(req, res, keyRes.error);
}

const { auth, keyWorkspaceId } = await Authenticator.fromKey(
keyRes.value,
req.query.wId as string
);

if (!keyRes.value.isSystem) {
return apiError(req, res, {
status_code: 400,
api_error: {
type: "invalid_request_error",
message:
"The Assitant API is only accessible by system API Key. Ping us at [email protected] if you want access to it.",
},
});
}

if (keyWorkspaceId !== req.query.wId) {
return apiError(req, res, {
status_code: 400,
api_error: {
type: "invalid_request_error",
message: "The Assistant API is only available on your own workspace.",
},
});
}

const conv = await Conversation.findOne({
where: {
sId: req.query.cId as string,
},
});
if (!conv) {
return apiError(req, res, {
status_code: 404,
api_error: {
type: "conversation_not_found",
message: "Conversation not found.",
},
});
}

// no time for actual io-ts parsing right now, so here is the expected structure.
// Will handle proper parsing later.
const payload = req.body as {
message: string;
context: {
timezone: string;
username: string;
fullName: string;
email: string;
profilePictureUrl: string;
};
};

switch (req.method) {
case "POST":
const p = postUserMessageWithPubSub(auth, {
conversation: {
id: conv.id,
created: conv.created.getTime(),
sId: conv.sId,
title: conv.title,
// not sure how to provide the content here for now.
content: [],
visibility: conv.visibility,
},
message: payload.message,
mentions: [],
context: {
timezone: payload.context.timezone,
username: payload.context.username,
fullName: payload.context.fullName,
email: payload.context.email,
profilePictureUrl: payload.context.profilePictureUrl,
},
});
res.status(200).end();
await p;
return;

default:
return apiError(req, res, {
status_code: 405,
api_error: {
type: "method_not_supported_error",
message: "The method passed is not supported, POST is expected.",
},
});
}
}

export default withLogging(handler);
87 changes: 87 additions & 0 deletions front/pages/api/v1/w/[wId]/assistant/conversation/[cId]/sub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { NextApiRequest, NextApiResponse } from "next";

import { getConversationEvents } from "@app/lib/api/assistant/pubsub";
import { Authenticator, getAPIKey } from "@app/lib/auth";
import { ReturnedAPIErrorType } from "@app/lib/error";
import { Conversation } from "@app/lib/models";
import { apiError, withLogging } from "@app/logger/withlogging";

async function handler(
req: NextApiRequest,
res: NextApiResponse<ReturnedAPIErrorType>
): Promise<void> {
const keyRes = await getAPIKey(req);
if (keyRes.isErr()) {
return apiError(req, res, keyRes.error);
}

if (!keyRes.value.isSystem) {
return apiError(req, res, {
status_code: 400,
api_error: {
type: "invalid_request_error",
message:
"The Assitant API is only accessible by system API Key. Ping us at [email protected] if you want access to it.",
},
});
}
const { keyWorkspaceId } = await Authenticator.fromKey(
keyRes.value,
req.query.wId as string
);

if (keyWorkspaceId !== req.query.wId) {
return apiError(req, res, {
status_code: 400,
api_error: {
type: "invalid_request_error",
message: "The Assistant API is only available on your own workspace.",
},
});
}

const conv = await Conversation.findOne({
where: {
sId: req.query.cId as string,
},
});
if (!conv) {
return apiError(req, res, {
status_code: 404,
api_error: {
type: "conversation_not_found",
message: "Conversation not found.",
},
});
}

switch (req.method) {
case "GET": {
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
});

for await (const event of getConversationEvents(conv.sId, null)) {
res.write(event);
// @ts-expect-error we need to flush for streaming but TS thinks flush() does not exists.
res.flush();
}

res.end();
return;
}

default:
return apiError(req, res, {
status_code: 405,
api_error: {
type: "method_not_supported_error",
message: "The method passed is not supported, POST is expected.",
},
});
}
}

export default withLogging(handler);
73 changes: 73 additions & 0 deletions front/pages/api/v1/w/[wId]/assistant/conversation/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { NextApiRequest, NextApiResponse } from "next";

import { Authenticator, getAPIKey } from "@app/lib/auth";
import { ReturnedAPIErrorType } from "@app/lib/error";
import { Conversation } from "@app/lib/models";
import { generateModelSId } from "@app/lib/utils";
import { apiError, withLogging } from "@app/logger/withlogging";
import { ConversationType } from "@app/types/assistant/conversation";

async function handler(
req: NextApiRequest,
res: NextApiResponse<ConversationType | ReturnedAPIErrorType>
): Promise<void> {
const keyRes = await getAPIKey(req);
if (keyRes.isErr()) {
return apiError(req, res, keyRes.error);
}

const { auth, keyWorkspaceId } = await Authenticator.fromKey(
keyRes.value,
req.query.wId as string
);

if (!keyRes.value.isSystem) {
return apiError(req, res, {
status_code: 400,
api_error: {
type: "invalid_request_error",
message:
"The Assitant API is only accessible by system API Key. Ping us at [email protected] if you want access to it.",
},
});
}

if (keyWorkspaceId !== req.query.wId) {
return apiError(req, res, {
status_code: 400,
api_error: {
type: "invalid_request_error",
message: "The Assistant API is only available on your own workspace.",
},
});
}

switch (req.method) {
case "POST":
const conv = await Conversation.create({
sId: generateModelSId(),
title: req.body.title,
created: new Date(),
visibility: req.body.visibility,
});
return res.status(200).json({
id: conv.id,
created: conv.created.getTime(),
sId: conv.sId,
title: conv.title,
visibility: conv.visibility,
content: [],
});

default:
return apiError(req, res, {
status_code: 405,
api_error: {
type: "method_not_supported_error",
message: "The method passed is not supported, POST is expected.",
},
});
}
}

export default withLogging(handler);

0 comments on commit 8e5932d

Please sign in to comment.