Skip to content

Commit

Permalink
Enable streaming in GoogleAIGemini (#1016)
Browse files Browse the repository at this point in the history
* Add streaming to GoogleAIGeminiGenerator
  • Loading branch information
Amnah199 authored Aug 27, 2024
1 parent b8d212b commit 611b05d
Show file tree
Hide file tree
Showing 4 changed files with 294 additions and 81 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import logging
from typing import Any, Dict, List, Optional, Union
from typing import Any, Callable, Dict, List, Optional, Union

import google.generativeai as genai
from google.ai.generativelanguage import Content, Part
from google.ai.generativelanguage import Tool as ToolProto
from google.generativeai import GenerationConfig, GenerativeModel
from google.generativeai.types import HarmBlockThreshold, HarmCategory, Tool
from google.generativeai.types import GenerateContentResponse, HarmBlockThreshold, HarmCategory, Tool
from haystack.core.component import component
from haystack.core.serialization import default_from_dict, default_to_dict
from haystack.dataclasses.byte_stream import ByteStream
from haystack.dataclasses import ByteStream, StreamingChunk
from haystack.dataclasses.chat_message import ChatMessage, ChatRole
from haystack.utils import Secret, deserialize_secrets_inplace
from haystack.utils import Secret, deserialize_callable, deserialize_secrets_inplace, serialize_callable

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -107,6 +107,7 @@ def __init__(
generation_config: Optional[Union[GenerationConfig, Dict[str, Any]]] = None,
safety_settings: Optional[Dict[HarmCategory, HarmBlockThreshold]] = None,
tools: Optional[List[Tool]] = None,
streaming_callback: Optional[Callable[[StreamingChunk], None]] = None,
):
"""
Initializes a `GoogleAIGeminiChatGenerator` instance.
Expand All @@ -132,6 +133,8 @@ def __init__(
A dictionary with `HarmCategory` as keys and `HarmBlockThreshold` as values.
For more information, see [the API reference](https://ai.google.dev/api)
:param tools: A list of Tool objects that can be used for [Function calling](https://ai.google.dev/docs/function_calling).
:param streaming_callback: A callback function that is called when a new token is received from the stream.
The callback function accepts StreamingChunk as an argument.
"""

genai.configure(api_key=api_key.resolve_value())
Expand All @@ -142,6 +145,7 @@ def __init__(
self._safety_settings = safety_settings
self._tools = tools
self._model = GenerativeModel(self._model_name, tools=self._tools)
self._streaming_callback = streaming_callback

def _generation_config_to_dict(self, config: Union[GenerationConfig, Dict[str, Any]]) -> Dict[str, Any]:
if isinstance(config, dict):
Expand All @@ -162,13 +166,16 @@ def to_dict(self) -> Dict[str, Any]:
:returns:
Dictionary with serialized data.
"""
callback_name = serialize_callable(self._streaming_callback) if self._streaming_callback else None

data = default_to_dict(
self,
api_key=self._api_key.to_dict(),
model=self._model_name,
generation_config=self._generation_config,
safety_settings=self._safety_settings,
tools=self._tools,
streaming_callback=callback_name,
)
if (tools := data["init_parameters"].get("tools")) is not None:
data["init_parameters"]["tools"] = []
Expand Down Expand Up @@ -213,6 +220,8 @@ def from_dict(cls, data: Dict[str, Any]) -> "GoogleAIGeminiChatGenerator":
data["init_parameters"]["safety_settings"] = {
HarmCategory(k): HarmBlockThreshold(v) for k, v in safety_settings.items()
}
if (serialized_callback_handler := data["init_parameters"].get("streaming_callback")) is not None:
data["init_parameters"]["streaming_callback"] = deserialize_callable(serialized_callback_handler)
return default_from_dict(cls, data)

def _convert_part(self, part: Union[str, ByteStream, Part]) -> Part:
Expand Down Expand Up @@ -274,16 +283,23 @@ def _message_to_content(self, message: ChatMessage) -> Content:
return Content(parts=[part], role=role)

@component.output_types(replies=List[ChatMessage])
def run(self, messages: List[ChatMessage]):
def run(
self,
messages: List[ChatMessage],
streaming_callback: Optional[Callable[[StreamingChunk], None]] = None,
):
"""
Generates text based on the provided messages.
:param messages:
A list of `ChatMessage` instances, representing the input messages.
:param streaming_callback:
A callback function that is called when a new token is received from the stream.
:returns:
A dictionary containing the following key:
- `replies`: A list containing the generated responses as `ChatMessage` instances.
"""
streaming_callback = streaming_callback or self._streaming_callback
history = [self._message_to_content(m) for m in messages[:-1]]
session = self._model.start_chat(history=history)

Expand All @@ -292,10 +308,22 @@ def run(self, messages: List[ChatMessage]):
content=new_message,
generation_config=self._generation_config,
safety_settings=self._safety_settings,
stream=streaming_callback is not None,
)

replies = self._get_stream_response(res, streaming_callback) if streaming_callback else self._get_response(res)

return {"replies": replies}

def _get_response(self, response_body: GenerateContentResponse) -> List[ChatMessage]:
"""
Extracts the responses from the Google AI response.
:param response_body: The response from Google AI request.
:returns: The extracted responses.
"""
replies = []
for candidate in res.candidates:
for candidate in response_body.candidates:
for part in candidate.content.parts:
if part.text != "":
replies.append(ChatMessage.from_system(part.text))
Expand All @@ -307,5 +335,23 @@ def run(self, messages: List[ChatMessage]):
name=part.function_call.name,
)
)
return replies

return {"replies": replies}
def _get_stream_response(
self, stream: GenerateContentResponse, streaming_callback: Callable[[StreamingChunk], None]
) -> List[ChatMessage]:
"""
Extracts the responses from the Google AI streaming response.
:param stream: The streaming response from the Google AI request.
:param streaming_callback: The handler for the streaming response.
:returns: The extracted response with the content of all streaming chunks.
"""
responses = []
for chunk in stream:
content = chunk.text if len(chunk.parts) > 0 and "text" in chunk.parts[0] else ""
streaming_callback(StreamingChunk(content=content, meta=chunk.to_dict()))
responses.append(content)

combined_response = "".join(responses).lstrip()
return [ChatMessage.from_system(content=combined_response)]
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import logging
from typing import Any, Dict, List, Optional, Union
from typing import Any, Callable, Dict, List, Optional, Union

import google.generativeai as genai
from google.ai.generativelanguage import Content, Part, Tool
from google.generativeai import GenerationConfig, GenerativeModel
from google.generativeai.types import HarmBlockThreshold, HarmCategory
from google.generativeai.types import GenerateContentResponse, HarmBlockThreshold, HarmCategory
from haystack.core.component import component
from haystack.core.component.types import Variadic
from haystack.core.serialization import default_from_dict, default_to_dict
from haystack.dataclasses.byte_stream import ByteStream
from haystack.utils import Secret, deserialize_secrets_inplace
from haystack.dataclasses import ByteStream, StreamingChunk
from haystack.utils import Secret, deserialize_callable, deserialize_secrets_inplace, serialize_callable

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -70,6 +70,7 @@ def __init__(
generation_config: Optional[Union[GenerationConfig, Dict[str, Any]]] = None,
safety_settings: Optional[Dict[HarmCategory, HarmBlockThreshold]] = None,
tools: Optional[List[Tool]] = None,
streaming_callback: Optional[Callable[[StreamingChunk], None]] = None,
):
"""
Initializes a `GoogleAIGeminiGenerator` instance.
Expand All @@ -91,6 +92,8 @@ def __init__(
A dictionary with `HarmCategory` as keys and `HarmBlockThreshold` as values.
For more information, see [the API reference](https://ai.google.dev/api)
:param tools: A list of Tool objects that can be used for [Function calling](https://ai.google.dev/docs/function_calling).
:param streaming_callback: A callback function that is called when a new token is received from the stream.
The callback function accepts StreamingChunk as an argument.
"""
genai.configure(api_key=api_key.resolve_value())

Expand All @@ -100,6 +103,7 @@ def __init__(
self._safety_settings = safety_settings
self._tools = tools
self._model = GenerativeModel(self._model_name, tools=self._tools)
self._streaming_callback = streaming_callback

def _generation_config_to_dict(self, config: Union[GenerationConfig, Dict[str, Any]]) -> Dict[str, Any]:
if isinstance(config, dict):
Expand All @@ -120,13 +124,15 @@ def to_dict(self) -> Dict[str, Any]:
:returns:
Dictionary with serialized data.
"""
callback_name = serialize_callable(self._streaming_callback) if self._streaming_callback else None
data = default_to_dict(
self,
api_key=self._api_key.to_dict(),
model=self._model_name,
generation_config=self._generation_config,
safety_settings=self._safety_settings,
tools=self._tools,
streaming_callback=callback_name,
)
if (tools := data["init_parameters"].get("tools")) is not None:
data["init_parameters"]["tools"] = [Tool.serialize(t) for t in tools]
Expand Down Expand Up @@ -156,6 +162,8 @@ def from_dict(cls, data: Dict[str, Any]) -> "GoogleAIGeminiGenerator":
data["init_parameters"]["safety_settings"] = {
HarmCategory(k): HarmBlockThreshold(v) for k, v in safety_settings.items()
}
if (serialized_callback_handler := data["init_parameters"].get("streaming_callback")) is not None:
data["init_parameters"]["streaming_callback"] = deserialize_callable(serialized_callback_handler)

return default_from_dict(cls, data)

Expand All @@ -176,28 +184,45 @@ def _convert_part(self, part: Union[str, ByteStream, Part]) -> Part:
raise ValueError(msg)

@component.output_types(replies=List[Union[str, Dict[str, str]]])
def run(self, parts: Variadic[Union[str, ByteStream, Part]]):
def run(
self,
parts: Variadic[Union[str, ByteStream, Part]],
streaming_callback: Optional[Callable[[StreamingChunk], None]] = None,
):
"""
Generates text based on the given input parts.
:param parts:
A heterogeneous list of strings, `ByteStream` or `Part` objects.
:param streaming_callback: A callback function that is called when a new token is received from the stream.
:returns:
A dictionary containing the following key:
- `replies`: A list of strings or dictionaries with function calls.
"""

# check if streaming_callback is passed
streaming_callback = streaming_callback or self._streaming_callback
converted_parts = [self._convert_part(p) for p in parts]

contents = [Content(parts=converted_parts, role="user")]
res = self._model.generate_content(
contents=contents,
generation_config=self._generation_config,
safety_settings=self._safety_settings,
stream=streaming_callback is not None,
)
self._model.start_chat()
replies = self._get_stream_response(res, streaming_callback) if streaming_callback else self._get_response(res)

return {"replies": replies}

def _get_response(self, response_body: GenerateContentResponse) -> List[str]:
"""
Extracts the responses from the Google AI request.
:param response_body: The response body from the Google AI request.
:returns: A list of string responses.
"""
replies = []
for candidate in res.candidates:
for candidate in response_body.candidates:
for part in candidate.content.parts:
if part.text != "":
replies.append(part.text)
Expand All @@ -207,5 +232,23 @@ def run(self, parts: Variadic[Union[str, ByteStream, Part]]):
"args": dict(part.function_call.args.items()),
}
replies.append(function_call)
return replies

return {"replies": replies}
def _get_stream_response(
self, stream: GenerateContentResponse, streaming_callback: Callable[[StreamingChunk], None]
) -> List[str]:
"""
Extracts the responses from the Google AI streaming response.
:param stream: The streaming response from the Google AI request.
:param streaming_callback: The handler for the streaming response.
:returns: A list of string responses.
"""

responses = []
for chunk in stream:
content = chunk.text if len(chunk.parts) > 0 and "text" in chunk.parts[0] else ""
streaming_callback(StreamingChunk(content=content, meta=chunk.to_dict()))
responses.append(content)

combined_response = ["".join(responses).lstrip()]
return combined_response
Loading

0 comments on commit 611b05d

Please sign in to comment.