From c7fe139daddfc00b1253e7a31648be983c79960b Mon Sep 17 00:00:00 2001 From: Walter Rafelsberger Date: Tue, 30 Jan 2024 11:12:05 +0100 Subject: [PATCH] fix llm stream subscription --- .../execute_custom_llm_chain/index.ts | 56 +++++++------------ 1 file changed, 20 insertions(+), 36 deletions(-) diff --git a/x-pack/plugins/elastic_assistant/server/lib/langchain/execute_custom_llm_chain/index.ts b/x-pack/plugins/elastic_assistant/server/lib/langchain/execute_custom_llm_chain/index.ts index f98b3710e19f7..9673d6c23752d 100644 --- a/x-pack/plugins/elastic_assistant/server/lib/langchain/execute_custom_llm_chain/index.ts +++ b/x-pack/plugins/elastic_assistant/server/lib/langchain/execute_custom_llm_chain/index.ts @@ -10,7 +10,6 @@ import { BufferMemory, ChatMessageHistory } from 'langchain/memory'; import { Tool } from '@langchain/core/tools'; import { streamFactory } from '@kbn/ml-response-stream/server'; -import { RunLogPatch } from '@langchain/core/dist/tracers/log_stream'; import { ElasticsearchStore } from '../elasticsearch_store/elasticsearch_store'; import { ActionsClientChatOpenAI } from '../llm/openai'; import { ActionsClientLlm } from '../llm/actions_client_llm'; @@ -111,7 +110,7 @@ export const callAgentExecutor: AgentExecutor = async ({ ? await initializeAgentExecutorWithOptions(tools, llm, { agentType: 'openai-functions', memory, - verbose: false, + verbose: true, }) : await initializeAgentExecutorWithOptions(tools, llm, { agentType: 'chat-conversational-react-description', @@ -125,10 +124,28 @@ export const callAgentExecutor: AgentExecutor = async ({ let traceData; if (isStream) { - const logStream = executor.streamLog( + const { + end: streamEnd, + push, + responseWithHeaders, + } = streamFactory<{ type: string; payload: string }>(request.headers, logger, false, false); + + executor.stream( { input: latestMessage[0].content, chat_history: [], + }, + { + callbacks: [ + { + handleLLMNewToken(payload) { + push({ payload, type: 'content' }); + }, + handleLLMEnd() { + streamEnd(); + }, + }, + ], } // TODO before merge to main // uncomment @@ -139,15 +156,6 @@ export const callAgentExecutor: AgentExecutor = async ({ // } ); - const { - end: streamEnd, - push, - responseWithHeaders, - } = streamFactory<{ type: string; payload: string }>(request.headers, logger, false, false); - - // Do not call this using `await` so it will run asynchronously while we return the stream in responseWithHeaders - readStream(logStream, push, streamEnd); - console.log('returning responseWithHeaders', responseWithHeaders); // TODO before merge to main // figure out how to pass trace_data and replacements @spong @macri @yuliia return responseWithHeaders; @@ -187,27 +195,3 @@ export const callAgentExecutor: AgentExecutor = async ({ }, }; }; -async function readStream( - logStream: AsyncGenerator, - push: (arg0: { type: string; payload: string }) => void, - streamEnd: () => void -) { - push({ type: 'starting', payload: 'hello world' }); - for await (const chunk of logStream) { - if (chunk.ops?.length > 0 && chunk.ops[0].op === 'add') { - const addOp = chunk.ops[0]; - if ( - addOp.path.startsWith('/logs/ActionsClientChatOpenAI') && - typeof addOp.value === 'string' && - addOp.value.length - ) { - push({ type: 'content', payload: addOp.value }); - await new Promise((resolve) => setTimeout(resolve, 0)); - } - } - } - - push({ type: 'after', payload: 'hello world' }); - streamEnd(); - push({ type: 'streamEnd', payload: 'hello world' }); -}