Skip to content

Commit

Permalink
Merge branch 'main' into change-meta-data-fields
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsbatista committed Aug 12, 2024
2 parents 818b292 + 93d2c68 commit 6fbce0a
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 295 deletions.
11 changes: 11 additions & 0 deletions integrations/amazon_bedrock/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Changelog

## [integrations/amazon_bedrock-v0.10.0] - 2024-08-12

### 🐛 Bug Fixes

- Support streaming_callback param in amazon bedrock generators (#927)

### Docs

- Update AmazonBedrockChatGenerator docstrings (#949)
- Update AmazonBedrockGenerator docstrings (#956)

## [integrations/amazon_bedrock-v0.9.3] - 2024-07-17

### 🚀 Features
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import json
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from typing import Any, Callable, Dict, List, Optional

from .handlers import TokenStreamingHandler
from haystack.dataclasses import StreamingChunk


class BedrockModelAdapter(ABC):
Expand Down Expand Up @@ -39,22 +39,24 @@ def get_responses(self, response_body: Dict[str, Any]) -> List[str]:
responses = [completion.lstrip() for completion in completions]
return responses

def get_stream_responses(self, stream, stream_handler: TokenStreamingHandler) -> List[str]:
def get_stream_responses(self, stream, streaming_callback: Callable[[StreamingChunk], None]) -> List[str]:
"""
Extracts the responses from the Amazon Bedrock streaming response.
:param stream: The streaming response from the Amazon Bedrock request.
:param stream_handler: The handler for the streaming response.
:param streaming_callback: The handler for the streaming response.
:returns: A list of string responses.
"""
tokens: List[str] = []
streaming_chunks: List[StreamingChunk] = []
for event in stream:
chunk = event.get("chunk")
if chunk:
decoded_chunk = json.loads(chunk["bytes"].decode("utf-8"))
token = self._extract_token_from_stream(decoded_chunk)
tokens.append(stream_handler(token, event_data=decoded_chunk))
responses = ["".join(tokens).lstrip()]
streaming_chunk: StreamingChunk = self._build_streaming_chunk(decoded_chunk)
streaming_chunks.append(streaming_chunk)
streaming_callback(streaming_chunk)

responses = ["".join(streaming_chunk.content for streaming_chunk in streaming_chunks).lstrip()]
return responses

def _get_params(self, inference_kwargs: Dict[str, Any], default_params: Dict[str, Any]) -> Dict[str, Any]:
Expand Down Expand Up @@ -84,12 +86,12 @@ def _extract_completions_from_response(self, response_body: Dict[str, Any]) -> L
"""

@abstractmethod
def _extract_token_from_stream(self, chunk: Dict[str, Any]) -> str:
def _build_streaming_chunk(self, chunk: Dict[str, Any]) -> StreamingChunk:
"""
Extracts the token from a streaming chunk.
Extracts the content and meta from a streaming chunk.
:param chunk: The streaming chunk.
:returns: A string token.
:param chunk: The streaming chunk as dict.
:returns: A StreamingChunk object.
"""


Expand Down Expand Up @@ -150,17 +152,17 @@ def _extract_completions_from_response(self, response_body: Dict[str, Any]) -> L

return [response_body["completion"]]

def _extract_token_from_stream(self, chunk: Dict[str, Any]) -> str:
def _build_streaming_chunk(self, chunk: Dict[str, Any]) -> StreamingChunk:
"""
Extracts the token from a streaming chunk.
Extracts the content and meta from a streaming chunk.
:param chunk: The streaming chunk.
:returns: A string token.
:param chunk: The streaming chunk as dict.
:returns: A StreamingChunk object.
"""
if self.use_messages_api:
return chunk.get("delta", {}).get("text", "")
return StreamingChunk(content=chunk.get("delta", {}).get("text", ""), meta=chunk)

return chunk.get("completion", "")
return StreamingChunk(content=chunk.get("completion", ""), meta=chunk)


class MistralAdapter(BedrockModelAdapter):
Expand Down Expand Up @@ -199,17 +201,18 @@ def _extract_completions_from_response(self, response_body: Dict[str, Any]) -> L
"""
return [output.get("text", "") for output in response_body.get("outputs", [])]

def _extract_token_from_stream(self, chunk: Dict[str, Any]) -> str:
def _build_streaming_chunk(self, chunk: Dict[str, Any]) -> StreamingChunk:
"""
Extracts the token from a streaming chunk.
Extracts the content and meta from a streaming chunk.
:param chunk: The streaming chunk.
:returns: A string token.
:param chunk: The streaming chunk as dict.
:returns: A StreamingChunk object.
"""
content = ""
chunk_list = chunk.get("outputs", [])
if chunk_list:
return chunk_list[0].get("text", "")
return ""
content = chunk_list[0].get("text", "")
return StreamingChunk(content=content, meta=chunk)


class CohereCommandAdapter(BedrockModelAdapter):
Expand Down Expand Up @@ -254,14 +257,14 @@ def _extract_completions_from_response(self, response_body: Dict[str, Any]) -> L
responses = [generation["text"] for generation in response_body["generations"]]
return responses

def _extract_token_from_stream(self, chunk: Dict[str, Any]) -> str:
def _build_streaming_chunk(self, chunk: Dict[str, Any]) -> StreamingChunk:
"""
Extracts the token from a streaming chunk.
Extracts the content and meta from a streaming chunk.
:param chunk: The streaming chunk.
:returns: A string token.
:param chunk: The streaming chunk as dict.
:returns: A StreamingChunk object.
"""
return chunk.get("text", "")
return StreamingChunk(content=chunk.get("text", ""), meta=chunk)


class CohereCommandRAdapter(BedrockModelAdapter):
Expand Down Expand Up @@ -313,15 +316,15 @@ def _extract_completions_from_response(self, response_body: Dict[str, Any]) -> L
responses = [response_body["text"]]
return responses

def _extract_token_from_stream(self, chunk: Dict[str, Any]) -> str:
def _build_streaming_chunk(self, chunk: Dict[str, Any]) -> StreamingChunk:
"""
Extracts the token from a streaming chunk.
Extracts the content and meta from a streaming chunk.
:param chunk: The streaming chunk.
:returns: A string token.
:param chunk: The streaming chunk as dict.
:returns: A StreamingChunk object.
"""
token: str = chunk.get("text", "")
return token
return StreamingChunk(content=token, meta=chunk)


class AI21LabsJurassic2Adapter(BedrockModelAdapter):
Expand Down Expand Up @@ -357,7 +360,7 @@ def _extract_completions_from_response(self, response_body: Dict[str, Any]) -> L
responses = [completion["data"]["text"] for completion in response_body["completions"]]
return responses

def _extract_token_from_stream(self, chunk: Dict[str, Any]) -> str:
def _build_streaming_chunk(self, chunk: Dict[str, Any]) -> StreamingChunk:
msg = "Streaming is not supported for AI21 Jurassic 2 models."
raise NotImplementedError(msg)

Expand Down Expand Up @@ -398,14 +401,14 @@ def _extract_completions_from_response(self, response_body: Dict[str, Any]) -> L
responses = [result["outputText"] for result in response_body["results"]]
return responses

def _extract_token_from_stream(self, chunk: Dict[str, Any]) -> str:
def _build_streaming_chunk(self, chunk: Dict[str, Any]) -> StreamingChunk:
"""
Extracts the token from a streaming chunk.
Extracts the content and meta from a streaming chunk.
:param chunk: The streaming chunk.
:returns: A string token.
:param chunk: The streaming chunk as dict.
:returns: A StreamingChunk object.
"""
return chunk.get("outputText", "")
return StreamingChunk(content=chunk.get("outputText", ""), meta=chunk)


class MetaLlamaAdapter(BedrockModelAdapter):
Expand Down Expand Up @@ -442,11 +445,11 @@ def _extract_completions_from_response(self, response_body: Dict[str, Any]) -> L
"""
return [response_body["generation"]]

def _extract_token_from_stream(self, chunk: Dict[str, Any]) -> str:
def _build_streaming_chunk(self, chunk: Dict[str, Any]) -> StreamingChunk:
"""
Extracts the token from a streaming chunk.
Extracts the content and meta from a streaming chunk.
:param chunk: The streaming chunk.
:returns: A string token.
:param chunk: The streaming chunk as dict.
:returns: A StreamingChunk object.
"""
return chunk.get("generation", "")
return StreamingChunk(content=chunk.get("generation", ""), meta=chunk)
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,18 @@ def get_responses(self, response_body: Dict[str, Any]) -> List[ChatMessage]:
return self._extract_messages_from_response(response_body)

def get_stream_responses(
self, stream: EventStream, stream_handler: Callable[[StreamingChunk], None]
self, stream: EventStream, streaming_callback: Callable[[StreamingChunk], None]
) -> List[ChatMessage]:
tokens: List[str] = []
streaming_chunks: List[StreamingChunk] = []
last_decoded_chunk: Dict[str, Any] = {}
for event in stream:
chunk = event.get("chunk")
if chunk:
last_decoded_chunk = json.loads(chunk["bytes"].decode("utf-8"))
token = self._extract_token_from_stream(last_decoded_chunk)
stream_chunk = StreamingChunk(content=token) # don't extract meta, we care about tokens only
stream_handler(stream_chunk) # callback the stream handler with StreamingChunk
tokens.append(token)
responses = ["".join(tokens).lstrip()]
streaming_chunk = self._build_streaming_chunk(last_decoded_chunk)
streaming_callback(streaming_chunk) # callback the stream handler with StreamingChunk
streaming_chunks.append(streaming_chunk)
responses = ["".join(chunk.content for chunk in streaming_chunks).lstrip()]
return [ChatMessage.from_assistant(response, meta=last_decoded_chunk) for response in responses]

@staticmethod
Expand Down Expand Up @@ -142,12 +141,12 @@ def _extract_messages_from_response(self, response_body: Dict[str, Any]) -> List
"""

@abstractmethod
def _extract_token_from_stream(self, chunk: Dict[str, Any]) -> str:
def _build_streaming_chunk(self, chunk: Dict[str, Any]) -> StreamingChunk:
"""
Extracts the token from a streaming chunk.
Extracts the content and meta from a streaming chunk.
:param chunk: The streaming chunk.
:returns: The extracted token.
:param chunk: The streaming chunk as dict.
:returns: A StreamingChunk object.
"""


Expand Down Expand Up @@ -252,16 +251,16 @@ def _extract_messages_from_response(self, response_body: Dict[str, Any]) -> List
messages.append(ChatMessage.from_assistant(content["text"], meta=meta))
return messages

def _extract_token_from_stream(self, chunk: Dict[str, Any]) -> str:
def _build_streaming_chunk(self, chunk: Dict[str, Any]) -> StreamingChunk:
"""
Extracts the token from a streaming chunk.
Extracts the content and meta from a streaming chunk.
:param chunk: The streaming chunk.
:returns: The extracted token.
:param chunk: The streaming chunk as dict.
:returns: A StreamingChunk object.
"""
if chunk.get("type") == "content_block_delta" and chunk.get("delta", {}).get("type") == "text_delta":
return chunk.get("delta", {}).get("text", "")
return ""
return StreamingChunk(content=chunk.get("delta", {}).get("text", ""), meta=chunk)
return StreamingChunk(content="", meta=chunk)

def _to_anthropic_message(self, m: ChatMessage) -> Dict[str, Any]:
"""
Expand Down Expand Up @@ -425,17 +424,17 @@ def _extract_messages_from_response(self, response_body: Dict[str, Any]) -> List
messages.append(ChatMessage.from_assistant(response["text"], meta=meta))
return messages

def _extract_token_from_stream(self, chunk: Dict[str, Any]) -> str:
def _build_streaming_chunk(self, chunk: Dict[str, Any]) -> StreamingChunk:
"""
Extracts the token from a streaming chunk.
Extracts the content and meta from a streaming chunk.
:param chunk: The streaming chunk.
:returns: The extracted token.
:param chunk: The streaming chunk as dict.
:returns: A StreamingChunk object.
"""
response_chunk = chunk.get("outputs", [])
if response_chunk:
return response_chunk[0].get("text", "")
return ""
return StreamingChunk(content=response_chunk[0].get("text", ""), meta=chunk)
return StreamingChunk(content="", meta=chunk)


class MetaLlama2ChatAdapter(BedrockModelChatAdapter):
Expand Down Expand Up @@ -543,11 +542,11 @@ def _extract_messages_from_response(self, response_body: Dict[str, Any]) -> List
metadata = {k: v for (k, v) in response_body.items() if k != message_tag}
return [ChatMessage.from_assistant(response_body[message_tag], meta=metadata)]

def _extract_token_from_stream(self, chunk: Dict[str, Any]) -> str:
def _build_streaming_chunk(self, chunk: Dict[str, Any]) -> StreamingChunk:
"""
Extracts the token from a streaming chunk.
Extracts the content and meta from a streaming chunk.
:param chunk: The streaming chunk.
:returns: The extracted token.
:param chunk: The streaming chunk as dict.
:returns: A StreamingChunk object.
"""
return chunk.get("generation", "")
return StreamingChunk(content=chunk.get("generation", ""), meta=chunk)
Loading

0 comments on commit 6fbce0a

Please sign in to comment.