From af0ef69170d736974f392ac69a46c4d055ff271c Mon Sep 17 00:00:00 2001 From: Angelo Paparazzi Date: Fri, 22 Nov 2024 12:29:29 -0500 Subject: [PATCH] feat(common): add parse_sse_stream_data function This function parses the raw buffer recieved from sse requests and yields a parsed dictionary --- ibm_watson/common.py | 23 +++++++++- test/integration/test_assistant_v2.py | 65 +++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 1 deletion(-) create mode 100644 test/integration/test_assistant_v2.py diff --git a/ibm_watson/common.py b/ibm_watson/common.py index 81ecc7ec0..a1595d614 100644 --- a/ibm_watson/common.py +++ b/ibm_watson/common.py @@ -1,6 +1,6 @@ # coding: utf-8 -# Copyright 2019 IBM All Rights Reserved. +# Copyright 2019, 2024 IBM All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,7 +15,9 @@ # limitations under the License. import platform +import json from .version import __version__ +from typing import Iterator SDK_ANALYTICS_HEADER = 'X-IBMCloud-SDK-Analytics' USER_AGENT_HEADER = 'User-Agent' @@ -48,3 +50,22 @@ def get_sdk_headers(service_name, service_version, operation_id): operation_id) headers[USER_AGENT_HEADER] = get_user_agent() return headers + + +def parse_sse_stream_data(response) -> Iterator[dict]: + event_message = None # Can be used in the future to return the event message to the user + data_json = None + + for chunk in response.iter_lines(): + decoded_chunk = chunk.decode("utf-8") + + if decoded_chunk.find("event", 0, len("event")) == 0: + event_message = decoded_chunk[len("event") + 2:] + elif decoded_chunk.find("data", 0, len("data")) == 0: + data_json_str = decoded_chunk[len("data") + 2:] + data_json = json.loads(data_json_str) + + if event_message and data_json is not None: + yield data_json + event_message = None + data_json = None diff --git a/test/integration/test_assistant_v2.py b/test/integration/test_assistant_v2.py new file mode 100644 index 000000000..db17bcae0 --- /dev/null +++ b/test/integration/test_assistant_v2.py @@ -0,0 +1,65 @@ +# coding: utf-8 + +# Copyright 2019, 2024 IBM All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import TestCase +import ibm_watson +from ibm_watson.assistant_v2 import MessageInput +from ibm_watson.common import parse_sse_stream_data +import pytest +import json +from ibm_cloud_sdk_core.authenticators import IAMAuthenticator + +class TestAssistantV2(TestCase): + + def setUp(self): + + with open('./auth.json') as f: + data = json.load(f) + assistant_auth = data.get("assistantv2") + self.assistant_id = assistant_auth.get("assistantId") + self.environment_id = assistant_auth.get("environmentId") + + self.authenticator = IAMAuthenticator(apikey=assistant_auth.get("apikey")) + self.assistant = ibm_watson.AssistantV2(version='2024-08-25', authenticator=self.authenticator) + self.assistant.set_service_url(assistant_auth.get("serviceUrl")) + self.assistant.set_default_headers({ + 'X-Watson-Learning-Opt-Out': '1', + 'X-Watson-Test': '1' + }) + + def test_list_assistants(self): + response = self.assistant.list_assistants().get_result() + assert response is not None + + def test_message_stream_stateless(self): + input = MessageInput(message_type="text", text="can you list the steps to create a custom extension?") + user_id = "Angelo" + + response = self.assistant.message_stream_stateless(self.assistant_id, self.environment_id, input=input, user_id=user_id).get_result() + + for data in parse_sse_stream_data(response): + # One of these items must exist + # assert "partial_item" in data_json or "complete_item" in data_json or "final_item" in data_json + + if "partial_item" in data: + assert data["partial_item"]["text"] is not None + elif "complete_item" in data: + assert data["complete_item"]["text"] is not None + elif "final_response" in data: + assert data["final_response"] is not None + else: + pytest.fail("Should be impossible to get here") +