From 7c3092ebca24c19d144cc5069720ac7a03c898ce Mon Sep 17 00:00:00 2001 From: Stanislas Polu Date: Mon, 2 Oct 2023 12:10:49 +0200 Subject: [PATCH] Slack bot cleanup (no repeat GET) (#1910) * WIP clean-up * Dust slack bot clean-up --- connectors/src/connectors/slack/bot.ts | 70 ++-- connectors/src/lib/dust_api.ts | 473 +++++++++---------------- 2 files changed, 215 insertions(+), 328 deletions(-) diff --git a/connectors/src/connectors/slack/bot.ts b/connectors/src/connectors/slack/bot.ts index ce30ae427868..70370487fca2 100644 --- a/connectors/src/connectors/slack/bot.ts +++ b/connectors/src/connectors/slack/bot.ts @@ -1,7 +1,7 @@ import { AgentGenerationSuccessEvent, + AgentMessageType, DustAPI, - PostMessagesRequestBodySchema, } from "@connectors/lib/dust_api"; import { Connector, @@ -188,38 +188,58 @@ async function botAnswerMessage( } } - const messagePayload: PostMessagesRequestBodySchema = { - content: message, - mentions: [{ configurationId: "dust" }], - context: { - timezone: slackChatBotMessage.slackTimezone || "Europe/Paris", - username: slackChatBotMessage.slackUserName, - fullName: - slackChatBotMessage.slackFullName || slackChatBotMessage.slackUserName, - email: slackChatBotMessage.slackEmail, - profilePictureUrl: slackChatBotMessage.slackAvatar || null, + const convRes = await dustAPI.createConversation({ + title: null, + visibility: "unlisted", + message: { + content: message, + mentions: [{ configurationId: "dust" }], + context: { + timezone: slackChatBotMessage.slackTimezone || "Europe/Paris", + username: slackChatBotMessage.slackUserName, + fullName: + slackChatBotMessage.slackFullName || + slackChatBotMessage.slackUserName, + email: slackChatBotMessage.slackEmail, + profilePictureUrl: slackChatBotMessage.slackAvatar || null, + }, }, - }; - const convRes = await dustAPI.createConversation( - null, - "unlisted", - messagePayload - ); + }); + if (convRes.isErr()) { return new Err(new Error(convRes.error.message)); } - const conv = convRes.value; - const stream = await conv.stream; - if (stream.isErr()) { - return new Err(new Error(stream.error.message)); - } - slackChatBotMessage.conversationId = conv.conversation.sId; + const conversation = convRes.value; + + slackChatBotMessage.conversationId = conversation.sId; await slackChatBotMessage.save(); + const agentMessages = conversation.content + .map((versions) => { + const m = versions[versions.length - 1]; + return m; + }) + .filter((m) => { + return m && m.type === "agent_message"; + }); + if (agentMessages.length === 0) { + return new Err(new Error("Failed to retrieve agent message")); + } + const agentMessage = agentMessages[0] as AgentMessageType; + + const streamRes = await dustAPI.streamAgentMessageEvents({ + conversation, + message: agentMessage, + }); + + if (streamRes.isErr()) { + return new Err(new Error(streamRes.error.message)); + } + let fullAnswer = ""; let lastSentDate = new Date(); - for await (const event of stream.value.eventStream) { + for await (const event of streamRes.value.eventStream) { switch (event.type) { case "user_message_error": { return new Err( @@ -254,7 +274,7 @@ async function botAnswerMessage( const finalAnswer = `${_removeCiteMention( event.text )}\n\n <${DUST_API}/w/${connector.workspaceId}/assistant/${ - conv.conversation.sId + conversation.sId }|Continue this conversation on Dust>`; await slackClient.chat.update({ diff --git a/connectors/src/lib/dust_api.ts b/connectors/src/lib/dust_api.ts index 4578eb64b5a6..9e28bb83ec2d 100644 --- a/connectors/src/lib/dust_api.ts +++ b/connectors/src/lib/dust_api.ts @@ -38,76 +38,14 @@ type DustAPICredentials = { workspaceId: string; }; -type ChatRetrievedDocumentType = { - dataSourceId: string; - sourceUrl: string; - documentId: string; - timestamp: string; - tags: string[]; - score: number; - chunks: { - text: string; - offset: number; - score: number; - }[]; -}; - -type MessageFeedbackStatus = "positive" | "negative" | null; - -type MessageRole = "user" | "retrieval" | "assistant" | "error"; -type ChatMessageType = { - sId: string; - role: MessageRole; - message?: string; // for `user`, `assistant` and `error` messages - retrievals?: ChatRetrievedDocumentType[]; // for `retrieval` messages - query?: string; // for `retrieval` messages (not persisted) - feedback?: MessageFeedbackStatus; -}; - -type ChatSessionType = { - id: number; - userId: number; +export type AgentActionSuccessEvent = { + type: "agent_action_success"; created: number; - sId: string; - title?: string; - messages?: ChatMessageType[]; - visibility: string; -}; - -// Event sent when the session is initially created. -type ChatSessionCreateEvent = { - type: "chat_session_create"; - session: ChatSessionType; -}; - -// Event sent when we know what will be the type of the next message. It is sent initially when the -// user message is created for consistency and then each time we know we're going for a retrieval or -// an assistant response. -type ChatMessageTriggerEvent = { - type: "chat_message_trigger"; - role: MessageRole; - // We might want to add some data here in the future e.g including - // information about the query being used in the case of retrieval. -}; - -// Event sent once the message is fully constructed. -type ChatMessageCreateEvent = { - type: "chat_message_create"; - message: ChatMessageType; -}; - -// Event sent when receiving streamed response from the model. -type ChatMessageTokensEvent = { - type: "chat_message_tokens"; + configurationId: string; messageId: string; - text: string; + action: AgentActionType; }; -// Event sent when the session is updated (eg title is set). -export type ChatSessionUpdateEvent = { - type: "chat_session_update"; - session: ChatSessionType; -}; // Event sent when tokens are streamed as the the agent is generating a message. export type GenerationTokensEvent = { type: "generation_tokens"; @@ -279,7 +217,55 @@ export type UserMessageType = { content: string; context: UserMessageContext; }; + +/** + * Agent messages + */ + +export type AgentActionType = RetrievalActionType; + +export type RetrievalActionType = { + type: "retrieval_action"; + params: { + relativeTimeFrame: TimeFrame | null; + query: string | null; + topK: number; + }; + documents: RetrievalDocumentType[] | null; +}; + +export type RetrievalDocumentType = { + dataSourceId: string; + sourceUrl: string | null; + documentId: string; + reference: string; // Short random string so that the model can refer to the document. + timestamp: number; + tags: string[]; + score: number; + chunks: { + text: string; + offset: number; + score: number; + }[]; +}; + +export type TimeFrame = { + duration: number; + unit: TimeframeUnit; +}; + +export const TIME_FRAME_UNITS = [ + "hour", + "day", + "week", + "month", + "year", +] as const; + +export type TimeframeUnit = (typeof TIME_FRAME_UNITS)[number]; + export type AgentMessageStatus = "created" | "succeeded" | "failed"; + /** * Both `action` and `message` are optional (we could have a no-op agent basically). * @@ -297,15 +283,18 @@ export type AgentMessageType = { // configuration: AgentConfigurationType; status: AgentMessageStatus; - // action: AgentActionType | null; + action: AgentActionType | null; content: string | null; - // feedbacks: UserFeedbackType[]; error: { code: string; message: string; } | null; }; +/** + * Conversation + */ + export type ConversationType = { id: ModelId; created: number; @@ -316,192 +305,6 @@ export type ConversationType = { content: (UserMessageType[] | AgentMessageType[])[]; }; -/** - * This help functions process a streamed response in the format of the Dust API for running - * streamed apps. - * - * @param res an HTTP response ready to be consumed as a stream - */ -export async function processStreamedChatResponse(res: Response) { - if (!res.ok || !res.body) { - return new Err({ - type: "dust_api_error", - message: `Error running streamed app: status_code=${ - res.status - } - message=${await res.text()}`, - }); - } - - let pendingEvents: ( - | ChatMessageTriggerEvent - | ChatSessionCreateEvent - | ChatMessageCreateEvent - | ChatMessageTokensEvent - | ChatSessionUpdateEvent - )[] = []; - - const parser = createParser((event) => { - if (event.type === "event") { - if (event.data) { - try { - const data = JSON.parse(event.data); - - switch (data.type) { - case "chat_session_create": { - pendingEvents.push(data as ChatSessionCreateEvent); - break; - } - case "chat_message_trigger": { - pendingEvents.push(data as ChatMessageTriggerEvent); - break; - } - case "chat_message_create": { - pendingEvents.push(data as ChatMessageCreateEvent); - break; - } - case "chat_message_tokens": { - pendingEvents.push(data as ChatMessageTokensEvent); - break; - } - case "chat_session_update": { - pendingEvents.push(data as ChatSessionUpdateEvent); - break; - } - } - } catch (err) { - logger.error({ error: err }, "Failed parsing chunk from Dust API"); - } - } - } - }); - - const reader = res.body.getReader(); - - const streamEvents = async function* () { - try { - while (true) { - const { done, value } = await reader.read(); - if (done) { - break; - } - parser.feed(new TextDecoder().decode(value)); - for (const event of pendingEvents) { - yield event; - } - pendingEvents = []; - } - } catch (e) { - yield { - type: "error", - content: { - code: "stream_error", - message: "Error streaming chunks", - }, - } as DustAppRunErrorEvent; - logger.error( - { - error: e, - }, - "Error streaming chunks." - ); - } finally { - reader.releaseLock(); - } - }; - - return new Ok({ eventStream: streamEvents() }); -} - -/** - * This help functions process a streamed response in the format of the Dust API for running - * streamed apps. - * - * @param res an HTTP response ready to be consumed as a stream - */ -export async function processCreateConversationEvents(res: Response) { - if (!res.ok || !res.body) { - return new Err({ - type: "dust_api_error", - message: `Error running streamed app: status_code=${ - res.status - } - message=${await res.text()}`, - }); - } - - let pendingEvents: ( - | UserMessageErrorEvent - | AgentErrorEvent - | GenerationTokensEvent - | AgentGenerationSuccessEvent - )[] = []; - - const parser = createParser((event) => { - if (event.type === "event") { - if (event.data) { - try { - const data = JSON.parse(event.data).data; - switch (data.type) { - case "user_message_error": { - pendingEvents.push(data as UserMessageErrorEvent); - break; - } - case "agent_error": { - pendingEvents.push(data as AgentErrorEvent); - break; - } - case "generation_tokens": { - pendingEvents.push(data as GenerationTokensEvent); - break; - } - case "agent_generation_success": { - pendingEvents.push(data as AgentGenerationSuccessEvent); - break; - } - } - } catch (err) { - logger.error({ error: err }, "Failed parsing chunk from Dust API"); - } - } - } - }); - - const reader = res.body.getReader(); - - const streamEvents = async function* () { - try { - while (true) { - const { done, value } = await reader.read(); - if (done) { - break; - } - parser.feed(new TextDecoder().decode(value)); - for (const event of pendingEvents) { - yield event; - } - pendingEvents = []; - } - } catch (e) { - yield { - type: "error", - content: { - code: "stream_error", - message: "Error streaming chunks", - }, - } as DustAppRunErrorEvent; - logger.error( - { - error: e, - }, - "Error streaming chunks." - ); - } finally { - reader.releaseLock(); - } - }; - - return new Ok({ eventStream: streamEvents() }); -} - export class DustAPI { _credentials: DustAPICredentials; @@ -540,17 +343,15 @@ export class DustAPI { return new Ok(json.data_sources as DataSourceType[]); } - async createConversation( - title: string | null, - visibility: ConversationVisibility, - message: PostMessagesRequestBodySchema - ) { - const requestPayload: PostConversationsRequestBodySchema = { - title: null, - visibility, - message, - }; - + // When creating a conversation with a user message, the API returns only after the user message + // was created (and if applicable the assocaited agent messages). + async createConversation({ + title, + visibility, + message, + }: PostConversationsRequestBodySchema): Promise< + Result + > { const res = await fetch( `${DUST_API}/api/v1/w/${this.workspaceId()}/assistant/conversations`, { @@ -559,7 +360,11 @@ export class DustAPI { Authorization: `Bearer ${this._credentials.apiKey}`, "Content-Type": "application/json", }, - body: JSON.stringify(requestPayload), + body: JSON.stringify({ + title, + visibility, + message, + }), } ); @@ -567,61 +372,123 @@ export class DustAPI { if (json.error) { return new Err(json.error as DustAPIErrorResponse); } - const conv = json.conversation as { sId: string }; - const agentMessageRes: Result = - await (async () => { - // looping 10 times to get the first agent message, we should listen on conversation stream - // but this works for now as we have only one answer and are under time pressure. - for (let i = 0; i < 10; i++) { - const conversation = await this.getConversation(conv.sId); - if (conversation.isOk()) { - const agentMessage = conversation.value.content - .flat() - .filter((m) => { - return m.type === "agent_message"; - }); - if (agentMessage.length > 0) { - return new Ok(agentMessage[0] as AgentMessageType); - } - } - await new Promise((r) => setTimeout(r, 1000)); - } - - return new Err( - new Error( - `Timeout waiting for agent message for conversation ${conv.sId}` - ) - ); - })(); - if (agentMessageRes.isErr()) { - return agentMessageRes; - } + return new Ok(json.conversation as ConversationType); + } + async streamAgentMessageEvents({ + conversation, + message, + }: { + conversation: ConversationType; + message: AgentMessageType; + }) { const headers = { "Content-Type": "application/json", Authorization: `Bearer ${this._credentials.apiKey}`, }; - const streamRes = await fetch( + const res = await fetch( `${DUST_API}/api/v1/w/${this.workspaceId()}/assistant/conversations/${ - conv.sId - }/messages/${agentMessageRes.value.sId}/events`, + conversation.sId + }/messages/${message.sId}/events`, { method: "GET", headers: headers, } ); - return new Ok({ - stream: processCreateConversationEvents(streamRes), - conversation: conv, + if (!res.ok || !res.body) { + return new Err({ + type: "dust_api_error", + message: `Error running streamed app: status_code=${ + res.status + } - message=${await res.text()}`, + }); + } + + let pendingEvents: ( + | UserMessageErrorEvent + | AgentErrorEvent + | AgentActionSuccessEvent + | GenerationTokensEvent + | AgentGenerationSuccessEvent + )[] = []; + + const parser = createParser((event) => { + if (event.type === "event") { + if (event.data) { + try { + const data = JSON.parse(event.data).data; + switch (data.type) { + case "user_message_error": { + pendingEvents.push(data as UserMessageErrorEvent); + break; + } + case "agent_error": { + pendingEvents.push(data as AgentErrorEvent); + break; + } + case "agent_action_success": { + pendingEvents.push(data as AgentActionSuccessEvent); + break; + } + case "generation_tokens": { + pendingEvents.push(data as GenerationTokensEvent); + break; + } + case "agent_generation_success": { + pendingEvents.push(data as AgentGenerationSuccessEvent); + break; + } + } + } catch (err) { + logger.error({ error: err }, "Failed parsing chunk from Dust API"); + } + } + } }); + + const reader = res.body.getReader(); + + const streamEvents = async function* () { + try { + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + parser.feed(new TextDecoder().decode(value)); + for (const event of pendingEvents) { + yield event; + } + pendingEvents = []; + } + } catch (e) { + yield { + type: "error", + content: { + code: "stream_error", + message: "Error streaming chunks", + }, + } as DustAppRunErrorEvent; + logger.error( + { + error: e, + }, + "Error streaming chunks." + ); + } finally { + reader.releaseLock(); + } + }; + + return new Ok({ eventStream: streamEvents() }); } - async getConversation(conversationid: string) { + async getConversation(conversationId: string) { const res = await fetch( - `${DUST_API}/api/v1/w/${this.workspaceId()}/assistant/conversations/${conversationid}`, + `${DUST_API}/api/v1/w/${this.workspaceId()}/assistant/conversations/${conversationId}`, { method: "GET", headers: {