From 4dac74a09010a5a8a01ea130ebca65f509bd959b Mon Sep 17 00:00:00 2001 From: Steph Milovic Date: Wed, 4 Dec 2024 16:39:09 -0700 Subject: [PATCH 1/5] wip --- .../chat_bedrock_converse/bedrock_runtime_client.ts | 1 + .../chat_bedrock_converse/chat_bedrock_converse.ts | 2 ++ .../server/language_models/chat_vertex/chat_vertex.ts | 1 + .../lib/langchain/graphs/default_assistant_graph/helpers.ts | 6 ++---- .../server/connector_types/bedrock/bedrock.ts | 1 + .../server/connector_types/openai/openai.ts | 4 +++- 6 files changed, 10 insertions(+), 5 deletions(-) diff --git a/x-pack/packages/kbn-langchain/server/language_models/chat_bedrock_converse/bedrock_runtime_client.ts b/x-pack/packages/kbn-langchain/server/language_models/chat_bedrock_converse/bedrock_runtime_client.ts index 7f20591bd51a4..9982e596da813 100644 --- a/x-pack/packages/kbn-langchain/server/language_models/chat_bedrock_converse/bedrock_runtime_client.ts +++ b/x-pack/packages/kbn-langchain/server/language_models/chat_bedrock_converse/bedrock_runtime_client.ts @@ -46,6 +46,7 @@ export class BedrockRuntimeClient extends _BedrockRuntimeClient { optionsOrCb?: HttpHandlerOptions | ((err: unknown, data: unknown) => void) ) { const options = typeof optionsOrCb !== 'function' ? optionsOrCb : {}; + console.log('==> stream vsignal', options); if (command.input.messages) { // without this, our human + human messages do not work and result in error: // A conversation must alternate between user and assistant roles. diff --git a/x-pack/packages/kbn-langchain/server/language_models/chat_bedrock_converse/chat_bedrock_converse.ts b/x-pack/packages/kbn-langchain/server/language_models/chat_bedrock_converse/chat_bedrock_converse.ts index bdc84130925d6..8fc7c2c61ef4b 100644 --- a/x-pack/packages/kbn-langchain/server/language_models/chat_bedrock_converse/chat_bedrock_converse.ts +++ b/x-pack/packages/kbn-langchain/server/language_models/chat_bedrock_converse/chat_bedrock_converse.ts @@ -40,6 +40,8 @@ export class ActionsClientChatBedrockConverse extends ChatBedrockConverse { model: fields?.model ?? DEFAULT_BEDROCK_MODEL, region: DEFAULT_BEDROCK_REGION, }); + this.signal = fields.signal; + console.log('==> ActionsClientChatBedrockConverse fields', fields); this.client = new BedrockRuntimeClient({ actionsClient, connectorId, diff --git a/x-pack/packages/kbn-langchain/server/language_models/chat_vertex/chat_vertex.ts b/x-pack/packages/kbn-langchain/server/language_models/chat_vertex/chat_vertex.ts index 7cea2d421a9da..447e70919978a 100644 --- a/x-pack/packages/kbn-langchain/server/language_models/chat_vertex/chat_vertex.ts +++ b/x-pack/packages/kbn-langchain/server/language_models/chat_vertex/chat_vertex.ts @@ -78,6 +78,7 @@ export class ActionsClientChatVertexAI extends ChatVertexAI { const parameters = this.invocationParams(options); const data = await this.connection.formatData(messages, parameters); const stream = await this.caller.callWithOptions({ signal: options?.signal }, async () => { + console.log('==> stream signal', options?.signal); const systemPart: GeminiPartText | undefined = data?.systemInstruction ?.parts?.[0] as unknown as GeminiPartText; const systemInstruction = systemPart?.text.length diff --git a/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/helpers.ts b/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/helpers.ts index 0126692b5b6a5..c7f0e5b008d04 100644 --- a/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/helpers.ts +++ b/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/helpers.ts @@ -65,6 +65,7 @@ export const streamGraph = async ({ let didEnd = false; const handleStreamEnd = (finalResponse: string, isError = false) => { + console.log('===> handleStreamEnd', { finalResponse, isError }); if (onLlmResponse) { onLlmResponse( finalResponse, @@ -160,10 +161,7 @@ export const streamGraph = async ({ finalMessage += msg.content; } } else if (event.event === 'on_llm_end' && !didEnd) { - const generations = event.data.output?.generations[0]; - if (generations && generations[0]?.generationInfo.finish_reason === 'stop') { - handleStreamEnd(generations[0]?.text ?? finalMessage); - } + handleStreamEnd(event.data.output?.generations[0]?.text ?? finalMessage); } } } diff --git a/x-pack/plugins/stack_connectors/server/connector_types/bedrock/bedrock.ts b/x-pack/plugins/stack_connectors/server/connector_types/bedrock/bedrock.ts index 339efa49f69bf..22247ba4f0c49 100644 --- a/x-pack/plugins/stack_connectors/server/connector_types/bedrock/bedrock.ts +++ b/x-pack/plugins/stack_connectors/server/connector_types/bedrock/bedrock.ts @@ -445,6 +445,7 @@ The Kibana Connector in use may need to be reconfigured with an updated Amazon B { signal, command }: ConverseActionParams, connectorUsageCollector: ConnectorUsageCollector ): Promise { + console.log('==> bedrockClientSend signal', signal); connectorUsageCollector.addRequestBodyBytes(undefined, command); const res = await this.bedrockClient.send(command, { abortSignal: signal, diff --git a/x-pack/plugins/stack_connectors/server/connector_types/openai/openai.ts b/x-pack/plugins/stack_connectors/server/connector_types/openai/openai.ts index 6cadc322a3d78..70a536c670aea 100644 --- a/x-pack/plugins/stack_connectors/server/connector_types/openai/openai.ts +++ b/x-pack/plugins/stack_connectors/server/connector_types/openai/openai.ts @@ -210,7 +210,7 @@ export class OpenAIConnector extends SubActionConnector { ); const axiosOptions = getAxiosOptions(this.provider, this.key, stream); - + console.log('==> streamApi', signal); const response = await this.request( { url: this.url, @@ -321,6 +321,7 @@ export class OpenAIConnector extends SubActionConnector { }; connectorUsageCollector.addRequestBodyBytes(undefined, requestBody); + console.log('==> invokeAsyncIterator', signal); const stream = await this.openAI.chat.completions.create(requestBody, { signal, timeout, // do not default if not provided @@ -330,6 +331,7 @@ export class OpenAIConnector extends SubActionConnector { return { consumerStream: teed[0], tokenCountStream: teed[1] }; // since we do not use the sub action connector request method, we need to do our own error handling } catch (e) { + console.log('==> invokeAsyncIterator e', e); const errorMessage = this.getResponseErrorMessage(e); throw new Error(errorMessage); } From 927ed21cbc080c4b963b25f8bf26e2950f826a53 Mon Sep 17 00:00:00 2001 From: Steph Milovic Date: Wed, 4 Dec 2024 17:06:16 -0700 Subject: [PATCH 2/5] rm logs --- .../bedrock_runtime_client.ts | 1 - .../chat_bedrock_converse.ts | 2 -- .../graphs/default_assistant_graph/graph.ts | 17 ++++++++-- .../graphs/default_assistant_graph/helpers.ts | 1 - .../graphs/default_assistant_graph/index.ts | 1 + .../default_assistant_graph/nodes/respond.ts | 5 ++- .../nodes/run_agent.ts | 31 ++++++++++--------- .../server/connector_types/bedrock/bedrock.ts | 1 - .../server/connector_types/openai/openai.ts | 3 -- 9 files changed, 35 insertions(+), 27 deletions(-) diff --git a/x-pack/packages/kbn-langchain/server/language_models/chat_bedrock_converse/bedrock_runtime_client.ts b/x-pack/packages/kbn-langchain/server/language_models/chat_bedrock_converse/bedrock_runtime_client.ts index 9982e596da813..7f20591bd51a4 100644 --- a/x-pack/packages/kbn-langchain/server/language_models/chat_bedrock_converse/bedrock_runtime_client.ts +++ b/x-pack/packages/kbn-langchain/server/language_models/chat_bedrock_converse/bedrock_runtime_client.ts @@ -46,7 +46,6 @@ export class BedrockRuntimeClient extends _BedrockRuntimeClient { optionsOrCb?: HttpHandlerOptions | ((err: unknown, data: unknown) => void) ) { const options = typeof optionsOrCb !== 'function' ? optionsOrCb : {}; - console.log('==> stream vsignal', options); if (command.input.messages) { // without this, our human + human messages do not work and result in error: // A conversation must alternate between user and assistant roles. diff --git a/x-pack/packages/kbn-langchain/server/language_models/chat_bedrock_converse/chat_bedrock_converse.ts b/x-pack/packages/kbn-langchain/server/language_models/chat_bedrock_converse/chat_bedrock_converse.ts index 8fc7c2c61ef4b..bdc84130925d6 100644 --- a/x-pack/packages/kbn-langchain/server/language_models/chat_bedrock_converse/chat_bedrock_converse.ts +++ b/x-pack/packages/kbn-langchain/server/language_models/chat_bedrock_converse/chat_bedrock_converse.ts @@ -40,8 +40,6 @@ export class ActionsClientChatBedrockConverse extends ChatBedrockConverse { model: fields?.model ?? DEFAULT_BEDROCK_MODEL, region: DEFAULT_BEDROCK_REGION, }); - this.signal = fields.signal; - console.log('==> ActionsClientChatBedrockConverse fields', fields); this.client = new BedrockRuntimeClient({ actionsClient, connectorId, diff --git a/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/graph.ts b/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/graph.ts index 4688caa176b56..10ecebb5e3f9b 100644 --- a/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/graph.ts +++ b/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/graph.ts @@ -34,6 +34,7 @@ export interface GetDefaultAssistantGraphParams { dataClients?: AssistantDataClients; createLlmInstance: () => BaseChatModel; logger: Logger; + signal?: AbortSignal; tools: StructuredTool[]; replacements: Replacements; } @@ -45,6 +46,8 @@ export const getDefaultAssistantGraph = ({ dataClients, createLlmInstance, logger, + // some chat models (bedrock) require a signal to be passed on agent invoke rather than the signal passed to the chat model + signal, tools, replacements, }: GetDefaultAssistantGraphParams) => { @@ -137,11 +140,19 @@ export const getDefaultAssistantGraph = ({ }) ) .addNode(NodeType.AGENT, (state: AgentState) => - runAgent({ ...nodeParams, state, agentRunnable, kbDataClient: dataClients?.kbDataClient }) + runAgent({ + ...nodeParams, + config: { signal }, + state, + agentRunnable, + kbDataClient: dataClients?.kbDataClient, + }) + ) + .addNode(NodeType.TOOLS, (state: AgentState) => + executeTools({ ...nodeParams, config: { signal }, state, tools }) ) - .addNode(NodeType.TOOLS, (state: AgentState) => executeTools({ ...nodeParams, state, tools })) .addNode(NodeType.RESPOND, (state: AgentState) => - respond({ ...nodeParams, state, model: createLlmInstance() }) + respond({ ...nodeParams, config: { signal }, state, model: createLlmInstance() }) ) .addNode(NodeType.MODEL_INPUT, (state: AgentState) => modelInput({ ...nodeParams, state })) .addEdge(START, NodeType.MODEL_INPUT) diff --git a/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/helpers.ts b/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/helpers.ts index c7f0e5b008d04..29cdd1c3c5350 100644 --- a/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/helpers.ts +++ b/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/helpers.ts @@ -65,7 +65,6 @@ export const streamGraph = async ({ let didEnd = false; const handleStreamEnd = (finalResponse: string, isError = false) => { - console.log('===> handleStreamEnd', { finalResponse, isError }); if (onLlmResponse) { onLlmResponse( finalResponse, diff --git a/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/index.ts b/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/index.ts index 4ddd3eae11624..60c229b46e61c 100644 --- a/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/index.ts +++ b/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/index.ts @@ -173,6 +173,7 @@ export const callAssistantGraph: AgentExecutor = async ({ // we need to pass it like this or streaming does not work for bedrock createLlmInstance, logger, + signal: abortSignal, tools, replacements, }); diff --git a/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/nodes/respond.ts b/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/nodes/respond.ts index bfd62ee7aab21..76d449373488f 100644 --- a/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/nodes/respond.ts +++ b/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/nodes/respond.ts @@ -7,6 +7,7 @@ import { BaseChatModel } from '@langchain/core/language_models/chat_models'; import { StringWithAutocomplete } from '@langchain/core/dist/utils/types'; +import { RunnableConfig } from '@langchain/core/runnables'; import { AGENT_NODE_TAG } from './run_agent'; import { AgentState, NodeParamsBase } from '../types'; import { NodeType } from '../constants'; @@ -14,9 +15,11 @@ import { NodeType } from '../constants'; export interface RespondParams extends NodeParamsBase { state: AgentState; model: BaseChatModel; + config?: RunnableConfig; } export async function respond({ + config, logger, state, model, @@ -34,7 +37,7 @@ export async function respond({ const responseMessage = await model // use AGENT_NODE_TAG to identify as agent node for stream parsing - .withConfig({ runName: 'Summarizer', tags: [AGENT_NODE_TAG] }) + .withConfig({ runName: 'Summarizer', tags: [AGENT_NODE_TAG], signal: config?.signal }) .invoke([userMessage]); return { diff --git a/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/nodes/run_agent.ts b/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/nodes/run_agent.ts index 053254a1d99b3..952b97287c3ca 100644 --- a/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/nodes/run_agent.ts +++ b/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/nodes/run_agent.ts @@ -43,21 +43,22 @@ export async function runAgent({ logger.debug(() => `${NodeType.AGENT}: Node state:\n${JSON.stringify(state, null, 2)}`); const knowledgeHistory = await kbDataClient?.getRequiredKnowledgeBaseDocumentEntries(); - - const agentOutcome = await agentRunnable.withConfig({ tags: [AGENT_NODE_TAG] }).invoke( - { - ...state, - knowledge_history: `${KNOWLEDGE_HISTORY_PREFIX}\n${ - knowledgeHistory?.length - ? JSON.stringify(knowledgeHistory.map((e) => e.text)) - : NO_KNOWLEDGE_HISTORY - }`, - // prepend any user prompt (gemini) - input: formatLatestUserMessage(state.input, state.llmType), - chat_history: state.messages, // TODO: Message de-dupe with ...state spread - }, - config - ); + const agentOutcome = await agentRunnable + .withConfig({ tags: [AGENT_NODE_TAG], signal: config?.signal }) + .invoke( + { + ...state, + knowledge_history: `${KNOWLEDGE_HISTORY_PREFIX}\n${ + knowledgeHistory?.length + ? JSON.stringify(knowledgeHistory.map((e) => e.text)) + : NO_KNOWLEDGE_HISTORY + }`, + // prepend any user prompt (gemini) + input: formatLatestUserMessage(state.input, state.llmType), + chat_history: state.messages, // TODO: Message de-dupe with ...state spread + }, + config + ); return { agentOutcome, diff --git a/x-pack/plugins/stack_connectors/server/connector_types/bedrock/bedrock.ts b/x-pack/plugins/stack_connectors/server/connector_types/bedrock/bedrock.ts index 22247ba4f0c49..339efa49f69bf 100644 --- a/x-pack/plugins/stack_connectors/server/connector_types/bedrock/bedrock.ts +++ b/x-pack/plugins/stack_connectors/server/connector_types/bedrock/bedrock.ts @@ -445,7 +445,6 @@ The Kibana Connector in use may need to be reconfigured with an updated Amazon B { signal, command }: ConverseActionParams, connectorUsageCollector: ConnectorUsageCollector ): Promise { - console.log('==> bedrockClientSend signal', signal); connectorUsageCollector.addRequestBodyBytes(undefined, command); const res = await this.bedrockClient.send(command, { abortSignal: signal, diff --git a/x-pack/plugins/stack_connectors/server/connector_types/openai/openai.ts b/x-pack/plugins/stack_connectors/server/connector_types/openai/openai.ts index 70a536c670aea..d6c6865d87344 100644 --- a/x-pack/plugins/stack_connectors/server/connector_types/openai/openai.ts +++ b/x-pack/plugins/stack_connectors/server/connector_types/openai/openai.ts @@ -210,7 +210,6 @@ export class OpenAIConnector extends SubActionConnector { ); const axiosOptions = getAxiosOptions(this.provider, this.key, stream); - console.log('==> streamApi', signal); const response = await this.request( { url: this.url, @@ -321,7 +320,6 @@ export class OpenAIConnector extends SubActionConnector { }; connectorUsageCollector.addRequestBodyBytes(undefined, requestBody); - console.log('==> invokeAsyncIterator', signal); const stream = await this.openAI.chat.completions.create(requestBody, { signal, timeout, // do not default if not provided @@ -331,7 +329,6 @@ export class OpenAIConnector extends SubActionConnector { return { consumerStream: teed[0], tokenCountStream: teed[1] }; // since we do not use the sub action connector request method, we need to do our own error handling } catch (e) { - console.log('==> invokeAsyncIterator e', e); const errorMessage = this.getResponseErrorMessage(e); throw new Error(errorMessage); } From 8a5ba08f2326462228089f0f04896ec0852c347d Mon Sep 17 00:00:00 2001 From: Steph Milovic Date: Wed, 4 Dec 2024 17:11:00 -0700 Subject: [PATCH 3/5] pipe through signal for gemini --- .../server/language_models/chat_vertex/chat_vertex.ts | 2 +- .../server/language_models/chat_vertex/connection.ts | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/packages/kbn-langchain/server/language_models/chat_vertex/chat_vertex.ts b/x-pack/packages/kbn-langchain/server/language_models/chat_vertex/chat_vertex.ts index 447e70919978a..5c7a9ef918da3 100644 --- a/x-pack/packages/kbn-langchain/server/language_models/chat_vertex/chat_vertex.ts +++ b/x-pack/packages/kbn-langchain/server/language_models/chat_vertex/chat_vertex.ts @@ -78,7 +78,6 @@ export class ActionsClientChatVertexAI extends ChatVertexAI { const parameters = this.invocationParams(options); const data = await this.connection.formatData(messages, parameters); const stream = await this.caller.callWithOptions({ signal: options?.signal }, async () => { - console.log('==> stream signal', options?.signal); const systemPart: GeminiPartText | undefined = data?.systemInstruction ?.parts?.[0] as unknown as GeminiPartText; const systemInstruction = systemPart?.text.length @@ -94,6 +93,7 @@ export class ActionsClientChatVertexAI extends ChatVertexAI { tools: data?.tools, temperature: this.temperature, ...systemInstruction, + signal: options?.signal, }, }, }; diff --git a/x-pack/packages/kbn-langchain/server/language_models/chat_vertex/connection.ts b/x-pack/packages/kbn-langchain/server/language_models/chat_vertex/connection.ts index 8ce776890acfa..442e6b079db9b 100644 --- a/x-pack/packages/kbn-langchain/server/language_models/chat_vertex/connection.ts +++ b/x-pack/packages/kbn-langchain/server/language_models/chat_vertex/connection.ts @@ -82,6 +82,7 @@ export class ActionsClientChatConnection extends ChatConnection { tools: data?.tools, temperature: this.temperature, ...systemInstruction, + signal: options?.signal, }, }, }; From a768bef08703b0f7776c86c9aaf0e159f707c842 Mon Sep 17 00:00:00 2001 From: Steph Milovic Date: Wed, 4 Dec 2024 17:23:41 -0700 Subject: [PATCH 4/5] revert spacing --- .../stack_connectors/server/connector_types/openai/openai.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugins/stack_connectors/server/connector_types/openai/openai.ts b/x-pack/plugins/stack_connectors/server/connector_types/openai/openai.ts index d6c6865d87344..6cadc322a3d78 100644 --- a/x-pack/plugins/stack_connectors/server/connector_types/openai/openai.ts +++ b/x-pack/plugins/stack_connectors/server/connector_types/openai/openai.ts @@ -210,6 +210,7 @@ export class OpenAIConnector extends SubActionConnector { ); const axiosOptions = getAxiosOptions(this.provider, this.key, stream); + const response = await this.request( { url: this.url, From b479f0973599c88d8527112a9a2ded7019117721 Mon Sep 17 00:00:00 2001 From: Steph Milovic Date: Thu, 5 Dec 2024 12:40:10 -0700 Subject: [PATCH 5/5] fix --- .../lib/langchain/graphs/default_assistant_graph/helpers.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/helpers.ts b/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/helpers.ts index 29cdd1c3c5350..a4b36dfa8dc22 100644 --- a/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/helpers.ts +++ b/x-pack/plugins/elastic_assistant/server/lib/langchain/graphs/default_assistant_graph/helpers.ts @@ -160,7 +160,7 @@ export const streamGraph = async ({ finalMessage += msg.content; } } else if (event.event === 'on_llm_end' && !didEnd) { - handleStreamEnd(event.data.output?.generations[0]?.text ?? finalMessage); + handleStreamEnd(event.data.output?.generations[0][0]?.text ?? finalMessage); } } }