From df9ba31f5e1e6d122e9249e52c60bee0358f94c1 Mon Sep 17 00:00:00 2001 From: Nupur Khare Date: Thu, 28 Dec 2023 20:32:50 +0530 Subject: [PATCH] 1. Added Async rest client in Channels (whatsapp, messenger, msteams) instead of requests. 2. Fixed unit and integration test cases for the same. --- .../channels/clients/messenger/__init__.py | 0 .../clients/messenger/messenger_client.py | 302 ++++++++++++++++++ .../channels/clients/whatsapp/cloud.py | 46 ++- .../channels/clients/whatsapp/dialog360.py | 20 +- .../channels/clients/whatsapp/on_premise.py | 47 ++- kairon/chat/handlers/channels/messenger.py | 13 +- kairon/chat/handlers/channels/msteams.py | 33 +- kairon/events/definitions/base.py | 2 +- .../events/definitions/message_broadcast.py | 4 +- kairon/shared/channels/broadcast/whatsapp.py | 15 +- tests/integration_test/chat_service_test.py | 129 ++++---- tests/unit_test/channel_client_test.py | 294 +++++++++-------- tests/unit_test/events/events_test.py | 50 +-- 13 files changed, 650 insertions(+), 305 deletions(-) create mode 100644 kairon/chat/handlers/channels/clients/messenger/__init__.py create mode 100644 kairon/chat/handlers/channels/clients/messenger/messenger_client.py diff --git a/kairon/chat/handlers/channels/clients/messenger/__init__.py b/kairon/chat/handlers/channels/clients/messenger/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/kairon/chat/handlers/channels/clients/messenger/messenger_client.py b/kairon/chat/handlers/channels/clients/messenger/messenger_client.py new file mode 100644 index 000000000..be044f240 --- /dev/null +++ b/kairon/chat/handlers/channels/clients/messenger/messenger_client.py @@ -0,0 +1,302 @@ +from __future__ import absolute_import + +import abc +import hashlib +import hmac +import logging + +import six + +__version__ = '6.0.0' + +from kairon.shared.rest_client import AioRestClient + +logger = logging.getLogger(__name__) + + +DEFAULT_API_VERSION = 2.12 + + +class MessengerClientOutput(object): + + # https://developers.facebook.com/docs/messenger-platform/send-messages#messaging_types + MESSAGING_TYPES = { + 'RESPONSE', + 'UPDATE', + 'MESSAGE_TAG', + } + + # https://developers.facebook.com/docs/messenger-platform/reference/send-api/#payload + NOTIFICATION_TYPES = { + 'REGULAR', + 'SILENT_PUSH', + 'NO_PUSH' + } + + def __init__(self, page_access_token, **kwargs): + """ + @required: + page_access_token + @optional: + session + api_version + app_secret + """ + + self.page_access_token = page_access_token + self.session = AioRestClient(False) + self.api_version = kwargs.get('api_version', DEFAULT_API_VERSION) + self.graph_url = 'https://graph.facebook.com/v{api_version}'.format(api_version=self.api_version) + self.app_secret = kwargs.get('app_secret') + + @property + def auth_args(self): + if not hasattr(self, '_auth_args'): + auth = { + 'access_token': self.page_access_token + } + if self.app_secret is not None: + appsecret_proof = self.generate_appsecret_proof() + auth['appsecret_proof'] = appsecret_proof + self._auth_args = auth + return self._auth_args + + async def make_request(self, method, url, request_body=None, headers=None, **kwargs): + is_streaming_resp = kwargs.pop('is_streaming_resp', False) + timeout = kwargs.pop('timeout', None) + resp = await self.session.request( + method, url, request_body=request_body, headers=headers, timeout=timeout, + return_json=False, is_streaming_resp=is_streaming_resp, max_retries=3 + ) + logger.debug(resp) + return resp + + async def get_user_data(self, recipient_id, fields=None, timeout=None): + params = {} + + if isinstance(fields, six.string_types): + params['fields'] = fields + elif isinstance(fields, (list, tuple)): + params['fields'] = ','.join(fields) + else: + params['fields'] = 'first_name,last_name,profile_pic,locale,timezone,gender' + + params.update(self.auth_args) + + url = '{graph_url}/{recipient_id}'.format(graph_url=self.graph_url, recipient_id=recipient_id) + resp = await self.make_request("GET", url, headers=self.auth_args, timeout=timeout) + return resp + + async def send(self, payload, recipient_id, messaging_type='RESPONSE', notification_type='REGULAR', + timeout=None, tag=None): + if messaging_type not in self.MESSAGING_TYPES: + raise ValueError('`{}` is not a valid `messaging_type`'.format(messaging_type)) + + if notification_type not in self.NOTIFICATION_TYPES: + raise ValueError('`{}` is not a valid `notification_type`'.format(notification_type)) + + body = { + 'messaging_type': messaging_type, + 'notification_type': notification_type, + 'recipient': { + 'id': recipient_id, + }, + 'message': payload, + } + + if tag: + body['tag'] = tag + + url = '{graph_url}/me/messages'.format(graph_url=self.graph_url) + resp = await self.make_request("POST", url, headers=self.auth_args, request_body=body, timeout=timeout) + return resp + + async def send_action(self, sender_action, recipient_id, timeout=None): + body = { + 'recipient': { + 'id': recipient_id, + }, + 'sender_action': sender_action + } + url = '{graph_url}/me/messages'.format(graph_url=self.graph_url) + resp = await self.make_request("POST", url, headers=self.auth_args, request_body=body, timeout=timeout) + return resp + + async def subscribe_app_to_page(self, timeout=None): + url = '{graph_url}/me/subscribed_apps'.format(graph_url=self.graph_url) + resp = await self.make_request("POST", url, headers=self.auth_args, timeout=timeout) + return resp + + async def set_messenger_profile(self, data, timeout=None): + url = '{graph_url}/me/messenger_profile'.format(graph_url=self.graph_url) + resp = await self.make_request("POST", url, headers=self.auth_args, request_body=data, timeout=timeout) + return resp + + async def delete_get_started(self, timeout=None): + body = { + 'fields': [ + 'get_started' + ], + } + url = '{graph_url}/me/messenger_profile'.format(graph_url=self.graph_url) + resp = await self.make_request("DELETE", url, headers=self.auth_args, request_body=body, timeout=timeout) + return resp + + async def delete_persistent_menu(self, timeout=None): + body = { + 'fields': [ + 'persistent_menu' + ], + } + url = '{graph_url}/me/messenger_profile'.format(graph_url=self.graph_url) + resp = await self.make_request("DELETE", url, headers=self.auth_args, request_body=body, timeout=timeout) + return resp + + async def link_account(self, account_linking_token, timeout=None): + params = dict({ + 'fields': 'recipient', + 'account_linking_token': account_linking_token + }, **self.auth_args) + url = '{graph_url}/me'.format(graph_url=self.graph_url) + resp = await self.make_request("POST", url, headers=params, timeout=timeout) + return resp + + async def unlink_account(self, psid, timeout=None): + payload = { + 'psid': psid + } + url = '{graph_url}/me/unlink_accounts'.format(graph_url=self.graph_url) + resp = await self.make_request("POST", url, headers=self.auth_args, request_body=payload, timeout=timeout) + return resp + + async def update_whitelisted_domains(self, domains, timeout=None): + if not isinstance(domains, list): + domains = [domains] + payload = { + 'whitelisted_domains': domains + } + url = '{graph_url}/me/messenger_profile'.format(graph_url=self.graph_url) + resp = await self.make_request("POST", url, headers=self.auth_args, request_body=payload, timeout=timeout) + return resp + + async def remove_whitelisted_domains(self, timeout=None): + payload = { + 'fields': [ + 'whitelisted_domains' + ], + } + url = '{graph_url}/me/messenger_profile'.format(graph_url=self.graph_url) + resp = await self.make_request("DELETE", url, headers=self.auth_args, request_body=payload, timeout=timeout) + return resp + + async def upload_attachment(self, attachment, timeout=None): + if not attachment.url: + raise ValueError('Attachment must have `url` specified') + if attachment.quick_replies: + raise ValueError('Attachment may not have `quick_replies`') + body = { + 'message': attachment.to_dict() + } + url = '{graph_url}/me/message_attachments'.format(graph_url=self.graph_url) + resp = await self.make_request("POST", url, headers=self.auth_args, request_body=body, timeout=timeout) + return resp + + def generate_appsecret_proof(self): + """ + @outputs: + appsecret_proof: HMAC-SHA256 hash of page access token + using app_secret as the key + """ + app_secret = str(self.app_secret).encode('utf8') + access_token = str(self.page_access_token).encode('utf8') + + return hmac.new(app_secret, access_token, hashlib.sha256).hexdigest() + + +class BaseMessenger(object): + __metaclass__ = abc.ABCMeta + + last_message = {} + + def __init__(self, page_access_token, app_secret=None): + self.page_access_token = page_access_token + self.app_secret = app_secret + self.client = MessengerClientOutput(self.page_access_token, app_secret=self.app_secret) + + @abc.abstractmethod + def account_linking(self, message): + """Method to handle `account_linking`""" + + @abc.abstractmethod + def message(self, message): + """Method to handle `messages`""" + + @abc.abstractmethod + def delivery(self, message): + """Method to handle `message_deliveries`""" + + @abc.abstractmethod + def optin(self, message): + """Method to handle `messaging_optins`""" + + @abc.abstractmethod + def postback(self, message): + """Method to handle `messaging_postbacks`""" + + @abc.abstractmethod + def read(self, message): + """Method to handle `message_reads`""" + + def handle(self, payload): + for entry in payload['entry']: + for message in entry['messaging']: + self.last_message = message + if message.get('account_linking'): + return self.account_linking(message) + elif message.get('delivery'): + return self.delivery(message) + elif message.get('message'): + return self.message(message) + elif message.get('optin'): + return self.optin(message) + elif message.get('postback'): + return self.postback(message) + elif message.get('read'): + return self.read(message) + + def get_user(self, fields=None, timeout=None): + return self.client.get_user_data(self.get_user_id(), fields=fields, timeout=timeout) + + def send(self, payload, messaging_type='RESPONSE', notification_type='REGULAR', timeout=None, tag=None): + return self.client.send(payload, self.get_user_id(), messaging_type=messaging_type, + notification_type=notification_type, timeout=timeout, tag=tag) + + def send_action(self, sender_action, timeout=None): + return self.client.send_action(sender_action, self.get_user_id(), timeout=timeout) + + def get_user_id(self): + return self.last_message['sender']['id'] + + def subscribe_app_to_page(self, timeout=None): + return self.client.subscribe_app_to_page(timeout=timeout) + + def set_messenger_profile(self, data, timeout=None): + return self.client.set_messenger_profile(data, timeout=timeout) + + def delete_get_started(self, timeout=None): + return self.client.delete_get_started(timeout=timeout) + + def link_account(self, account_linking_token, timeout=None): + return self.client.link_account(account_linking_token, timeout=timeout) + + def unlink_account(self, psid, timeout=None): + return self.client.unlink_account(psid, timeout=timeout) + + def add_whitelisted_domains(self, domains, timeout=None): + return self.client.update_whitelisted_domains(domains, timeout=timeout) + + def remove_whitelisted_domains(self, timeout=None): + return self.client.remove_whitelisted_domains(timeout=timeout) + + def upload_attachment(self, attachment, timeout=None): + return self.client.upload_attachment(attachment, timeout=timeout) diff --git a/kairon/chat/handlers/channels/clients/whatsapp/cloud.py b/kairon/chat/handlers/channels/clients/whatsapp/cloud.py index 9016c7222..1dc434143 100644 --- a/kairon/chat/handlers/channels/clients/whatsapp/cloud.py +++ b/kairon/chat/handlers/channels/clients/whatsapp/cloud.py @@ -3,10 +3,9 @@ import logging from typing import Text, Dict -import requests - from kairon import Utility from kairon.exceptions import AppException +from kairon.shared.rest_client import AioRestClient logger = logging.getLogger(__name__) @@ -40,7 +39,7 @@ def __init__(self, access_token, **kwargs): self.from_phone_number_id = kwargs.get('from_phone_number_id') if self.client_type == "meta" and Utility.check_empty_string(self.from_phone_number_id): raise AppException("missing parameter 'from_phone_number_id'") - self.session = kwargs.get('session', requests.Session()) + self.client = AioRestClient(False) self.api_version = kwargs.get('api_version', DEFAULT_API_VERSION) self.app = 'https://graph.facebook.com/v{api_version}'.format(api_version=self.api_version) self.app_secret = kwargs.get('app_secret') @@ -61,7 +60,7 @@ def auth_args(self): self._auth_args = auth return self._auth_args - def send(self, payload, to_phone_number, messaging_type, recipient_type='individual', timeout=None, tag=None): + async def send(self, payload, to_phone_number, messaging_type, recipient_type='individual', timeout=None, tag=None): """ @required: payload: message request payload @@ -87,9 +86,9 @@ def send(self, payload, to_phone_number, messaging_type, recipient_type='individ if tag: body['tag'] = tag - return self.send_action(body) + return await self.send_action(body) - def send_json(self, payload: dict, to_phone_number, recipient_type='individual', timeout=None): + async def send_json(self, payload: dict, to_phone_number, recipient_type='individual', timeout=None): """ @required: payload: message request payload @@ -105,9 +104,9 @@ def send_json(self, payload: dict, to_phone_number, recipient_type='individual', "to": to_phone_number }) - return self.send_action(payload) + return await self.send_action(payload) - def send_action(self, payload, timeout=None, **kwargs): + async def send_action(self, payload, timeout=None, **kwargs): """ @required: payload: message request payload @@ -115,17 +114,15 @@ def send_action(self, payload, timeout=None, **kwargs): timeout: request timeout @outputs: response json """ - r = self.session.post( - '{app}/{from_phone_number_id}/messages'.format(app=self.app, from_phone_number_id=self.from_phone_number_id), - params=self.auth_args, - json=payload, - timeout=timeout - ) - resp = r.json() + is_streaming_resp = kwargs.get("stream", False) + url = '{app}/{from_phone_number_id}/messages'.format(app=self.app, + from_phone_number_id=self.from_phone_number_id) + resp = await self.client.request("POST", url, headers=self.auth_args, request_body=payload, timeout=timeout, + return_json=False, is_streaming_resp=is_streaming_resp, max_retries=3) logger.debug(resp) return resp - def get_attachment(self, attachment_id, timeout=None): + async def get_attachment(self, attachment_id, timeout=None): """ @required: attachment_id: audio/video/image/document id @@ -133,18 +130,15 @@ def get_attachment(self, attachment_id, timeout=None): timeout: request timeout @outputs: response json """ - r = self.session.get( - '{app}/{attachment_id}'.format(app=self.app, attachment_id=attachment_id), - params=self.auth_args, - timeout=timeout - ) - resp = r.json() + url = '{app}/{attachment_id}'.format(app=self.app, attachment_id=attachment_id) + resp = await self.client.request("GET", url, headers=self.auth_args, timeout=timeout, + return_json=False, max_retries=3) logger.debug(resp) return resp - def mark_as_read(self, msg_id, timeout=None): + async def mark_as_read(self, msg_id, timeout=None): payload = {"messaging_product": "whatsapp", "status": "read", "message_id": msg_id} - return self.send_action(payload) + return await self.send_action(payload) def generate_appsecret_proof(self): """ @@ -157,7 +151,7 @@ def generate_appsecret_proof(self): return hmac.new(app_secret, access_token, hashlib.sha256).hexdigest() - def send_template_message(self, name: Text, to_phone_number, language_code: Text = "en", components: Dict = None, namespace: Text = None): + async def send_template_message(self, name: Text, to_phone_number, language_code: Text = "en", components: Dict = None, namespace: Text = None): payload = { "language": { "code": language_code @@ -166,4 +160,4 @@ def send_template_message(self, name: Text, to_phone_number, language_code: Text } if components: payload.update({"components": components}) - return self.send(payload, to_phone_number, messaging_type="template") + return await self.send(payload, to_phone_number, messaging_type="template") diff --git a/kairon/chat/handlers/channels/clients/whatsapp/dialog360.py b/kairon/chat/handlers/channels/clients/whatsapp/dialog360.py index 6e45a39f3..84f9df619 100644 --- a/kairon/chat/handlers/channels/clients/whatsapp/dialog360.py +++ b/kairon/chat/handlers/channels/clients/whatsapp/dialog360.py @@ -1,7 +1,8 @@ +from loguru import logger + from kairon import Utility from kairon.chat.handlers.channels.clients.whatsapp.cloud import WhatsappCloud from kairon.shared.constants import WhatsappBSPTypes -from loguru import logger class BSP360Dialog(WhatsappCloud): @@ -23,7 +24,7 @@ def auth_args(self): self._auth_args = {self.auth_header: self.access_token} return self._auth_args - def send_action(self, payload, timeout=None, **kwargs): + async def send_action(self, payload, timeout=None, **kwargs): """ @required: payload: message request payload @@ -31,16 +32,13 @@ def send_action(self, payload, timeout=None, **kwargs): timeout: request timeout @outputs: response json """ - r = self.session.post( - '{app}/messages'.format(app=self.app), - headers=self.auth_args, - json=payload, - timeout=timeout - ) - resp = r.json() + is_streaming_resp = kwargs.get("stream", False) + url = '{app}/messages'.format(app=self.app) + resp = await self.client.request("POST", url, headers=self.auth_args, request_body=payload, timeout=timeout, + return_json=False, is_streaming_resp=is_streaming_resp, max_retries=3) logger.debug(resp) return resp - def mark_as_read(self, msg_id, timeout=None): + async def mark_as_read(self, msg_id, timeout=None): payload = {"messaging_product": "whatsapp", "status": "read", "message_id": msg_id} - return self.send_action(payload) + return await self.send_action(payload) diff --git a/kairon/chat/handlers/channels/clients/whatsapp/on_premise.py b/kairon/chat/handlers/channels/clients/whatsapp/on_premise.py index 9c1cd985e..41bd4c749 100644 --- a/kairon/chat/handlers/channels/clients/whatsapp/on_premise.py +++ b/kairon/chat/handlers/channels/clients/whatsapp/on_premise.py @@ -1,11 +1,10 @@ import logging from typing import Text, Dict -import requests - +from kairon.chat.handlers.channels.clients.whatsapp.cloud import WhatsappCloud from kairon.exceptions import AppException +from kairon.shared.rest_client import AioRestClient from kairon.shared.utils import Utility -from kairon.chat.handlers.channels.clients.whatsapp.cloud import WhatsappCloud logger = logging.getLogger(__name__) @@ -25,9 +24,9 @@ def __init__(self, access_token, **kwargs): """ super().__init__(access_token, **kwargs) self.access_token = access_token - self.session = kwargs.get('session', requests.Session()) + self.client = AioRestClient(False) - def send_action(self, payload, timeout=None, **kwargs): + async def send_action(self, payload, timeout=None, **kwargs): """ @required: payload: message request payload @@ -35,17 +34,14 @@ def send_action(self, payload, timeout=None, **kwargs): timeout: request timeout @outputs: response json """ - r = self.session.post( - '{app}/messages'.format(app=self.app), - headers=self.auth_args, - json=payload, - timeout=timeout - ) - resp = r.json() + is_streaming_resp = kwargs.get("stream", False) + url = '{app}/messages'.format(app=self.app) + resp = await self.client.request("POST", url, headers=self.auth_args, request_body=payload, timeout=timeout, + return_json=False, is_streaming_resp=is_streaming_resp, max_retries=3) logger.debug(resp) return resp - def get_attachment(self, media_id, timeout=None): + async def get_attachment(self, media_id, timeout=None): """ @required: media_id: audio/video/image/document id @@ -53,30 +49,23 @@ def get_attachment(self, media_id, timeout=None): timeout: request timeout @outputs: response json """ - r = self.session.get( - '{app}/media/{media_id}'.format(app=self.app, media_id=media_id), - headers=self.auth_args, - timeout=timeout - ) - resp = r.json() + url = '{app}/media/{media_id}'.format(app=self.app, media_id=media_id) + resp = await self.client.request("GET", url, headers=self.auth_args, timeout=timeout, + return_json=False, max_retries=3) logger.debug(resp) return resp - def mark_as_read(self, msg_id, timeout=None): + async def mark_as_read(self, msg_id, timeout=None): payload = { "status": "read" } - r = self.session.put( - '{app}/messages/{message_id}'.format(app=self.app, message_id=msg_id), - headers=self.auth_args, - json=payload, - timeout=timeout - ) - resp = r.json() + url = '{app}/messages/{message_id}'.format(app=self.app, message_id=msg_id) + resp = await self.client.request("PUT", url, headers=self.auth_args, request_body=payload, timeout=timeout, + return_json=False, max_retries=3) logger.debug(resp) return resp - def send_template_message(self, name: Text, to_phone_number, language_code: Text = "en", components: Dict = None, namespace: Text = None): + async def send_template_message(self, name: Text, to_phone_number, language_code: Text = "en", components: Dict = None, namespace: Text = None): if Utility.check_empty_string(namespace): raise AppException("namespace is required to send messages using on-premises api!") @@ -90,4 +79,4 @@ def send_template_message(self, name: Text, to_phone_number, language_code: Text } if components: payload.update({"components": components}) - return self.send(payload, to_phone_number, messaging_type="template") + return await self.send(payload, to_phone_number, messaging_type="template") diff --git a/kairon/chat/handlers/channels/messenger.py b/kairon/chat/handlers/channels/messenger.py index f23bfc22f..052c3e489 100644 --- a/kairon/chat/handlers/channels/messenger.py +++ b/kairon/chat/handlers/channels/messenger.py @@ -2,8 +2,8 @@ import hmac import logging from typing import Text, List, Dict, Any, Iterable, Optional, Union + import rasa.shared.utils.io -from fbmessenger import MessengerClient from fbmessenger.attachments import Image from fbmessenger.elements import Text as FBText from fbmessenger.quick_replies import QuickReplies, QuickReply @@ -11,13 +11,14 @@ from rasa.core.channels.channel import UserMessage, OutputChannel, InputChannel from starlette.requests import Request +from kairon import Utility from kairon.chat.agent_processor import AgentProcessor +from kairon.chat.converters.channels.response_factory import ConverterFactory from kairon.chat.handlers.channels.base import ChannelHandlerBase +from kairon.chat.handlers.channels.clients.messenger.messenger_client import MessengerClientOutput from kairon.shared.chat.processor import ChatDataProcessor from kairon.shared.constants import ChannelTypes from kairon.shared.models import User -from kairon import Utility -from kairon.chat.converters.channels.response_factory import ConverterFactory logger = logging.getLogger(__name__) @@ -34,7 +35,7 @@ def __init__( page_access_token: Text, ) -> None: - self.client = MessengerClient(page_access_token) + self.client = MessengerClientOutput(page_access_token) self.last_message: Dict[Text, Any] = {} def get_user_id(self) -> Text: @@ -177,7 +178,7 @@ class MessengerBot(OutputChannel): def name(cls) -> Text: return "facebook" - def __init__(self, messenger_client: MessengerClient) -> None: + def __init__(self, messenger_client: MessengerClientOutput) -> None: self.messenger_client = messenger_client super().__init__() @@ -411,7 +412,7 @@ def validate_hub_signature( return False def get_output_channel(self) -> OutputChannel: - client = MessengerClient(self.fb_access_token) + client = MessengerClientOutput(self.fb_access_token) return MessengerBot(client) diff --git a/kairon/chat/handlers/channels/msteams.py b/kairon/chat/handlers/channels/msteams.py index 370470edf..5bfd3aba7 100644 --- a/kairon/chat/handlers/channels/msteams.py +++ b/kairon/chat/handlers/channels/msteams.py @@ -17,11 +17,11 @@ from kairon import Utility from kairon.chat.agent_processor import AgentProcessor from kairon.chat.converters.channels.response_factory import ConverterFactory -from kairon.chat.converters.channels.responseconverter import ElementTransformerOps from kairon.chat.handlers.channels.base import ChannelHandlerBase from kairon.shared.chat.processor import ChatDataProcessor from kairon.shared.constants import ChannelTypes from kairon.shared.models import User +from kairon.shared.rest_client import AioRestClient class MSTeamBot(OutputChannel): @@ -42,6 +42,7 @@ def __init__( conversation: Dict[Text, Any], bot: Text, service_url: Text, + session: AioRestClient(False), ) -> None: service_url = ( f"{service_url}/" if not service_url.endswith("/") else service_url) @@ -50,6 +51,15 @@ def __init__( self.conversation = conversation self.global_uri = f"{service_url}v3/" self.bot = bot + self.session = session + + async def make_request(self, method, url, request_body=None, headers=None, timeout=None, is_streaming_resp=False): + resp = await self.session.request( + method, url, request_body=request_body, headers=headers, timeout=timeout, + return_json=False, is_streaming_resp=is_streaming_resp, max_retries=3 + ) + logger.debug(resp) + return resp async def _get_headers(self, refetch=False) -> Optional[Dict[Text, Any]]: ms_oauthurl = Utility.system_metadata["channels"][ChannelTypes.MSTEAMS.value]["MICROSOFT_OAUTH2_URL"] @@ -65,12 +75,11 @@ async def _get_headers(self, refetch=False) -> Optional[Dict[Text, Any]]: 'scope': scope, } - token_response = requests.post(uri, data=payload) + token_response = await self.make_request("POST", uri, request_body=payload) - if token_response.ok: - token_data = token_response.json() - access_token = token_data["access_token"] - token_expiration = token_data["expires_in"] + if token_response.status_code == 200: + access_token = token_response["access_token"] + token_expiration = token_response["expires_in"] delta = datetime.timedelta(seconds=int(token_expiration)) MSTeamBot.token_expiration_date = datetime.datetime.now() + delta @@ -104,24 +113,20 @@ async def send(self, message_data: Dict[Text, Any]) -> None: self.global_uri, self.conversation["id"] ) headers = await self._get_headers() - send_response = requests.post( - post_message_uri,headers=headers, data=json.dumps(message_data) - ) + send_response = await self.make_request("POST", post_message_uri, request_body=json.dumps(message_data), headers=headers) if send_response.status_code == 403: headers = await self._get_headers(True) - send_response = requests.post( - post_message_uri, headers=headers, data=json.dumps(message_data) - ) + send_response = await self.make_request("POST", post_message_uri, request_body=json.dumps(message_data), + headers=headers) - if not send_response.ok: + if not send_response != 200: logger.error( "Error trying to send botframework messge. Response: %s", send_response.text, ) raise Exception(f"Exception while responding to MSTeams:: {send_response.text} and status::{send_response.status_code}") - async def send_text_message( self, recipient_id: Text, text: Text, **kwargs: Any ) -> None: diff --git a/kairon/events/definitions/base.py b/kairon/events/definitions/base.py index 6ff40442b..a56c0ef0b 100644 --- a/kairon/events/definitions/base.py +++ b/kairon/events/definitions/base.py @@ -14,5 +14,5 @@ def enqueue(self, **kwargs): raise NotImplementedError("Provider not implemented") @abstractmethod - def execute(self, **kwargs): + async def execute(self, **kwargs): raise NotImplementedError("Provider not implemented") diff --git a/kairon/events/definitions/message_broadcast.py b/kairon/events/definitions/message_broadcast.py index b21161f21..e83a7d792 100644 --- a/kairon/events/definitions/message_broadcast.py +++ b/kairon/events/definitions/message_broadcast.py @@ -40,7 +40,7 @@ def validate(self): len(list(MessageBroadcastProcessor.list_settings(self.bot, timestamp__gt=date_today))): raise AppException("Notification scheduling limit reached!") - def execute(self, event_id: Text, **kwargs): + async def execute(self, event_id: Text, **kwargs): """ Execute the event. """ @@ -52,7 +52,7 @@ def execute(self, event_id: Text, **kwargs): reference_id, config = self.__retrieve_config(event_id) broadcast = MessageBroadcastFactory.get_instance(config["connector_type"]).from_config(config, reference_id) recipients = broadcast.get_recipients() - broadcast.send(recipients) + await broadcast.send(recipients) status = EVENT_STATUS.COMPLETED.value except Exception as e: logger.exception(e) diff --git a/kairon/shared/channels/broadcast/whatsapp.py b/kairon/shared/channels/broadcast/whatsapp.py index 0c4a44458..81c4d33e2 100644 --- a/kairon/shared/channels/broadcast/whatsapp.py +++ b/kairon/shared/channels/broadcast/whatsapp.py @@ -1,8 +1,11 @@ +import asyncio import json from typing import List, Text, Dict import pymongo import requests +from loguru import logger +from mongoengine import DoesNotExist from uuid6 import uuid7 from kairon import Utility @@ -17,8 +20,6 @@ from kairon.shared.constants import ChannelTypes, ActorType from kairon.shared.data.constant import EVENT_STATUS from kairon.shared.data.processor import MongoProcessor -from loguru import logger -from mongoengine import DoesNotExist class WhatsappBroadcast(MessageBroadcastFromConfig): @@ -42,9 +43,9 @@ def get_recipients(self, **kwargs): ) return recipients - def send(self, recipients: List, **kwargs): + async def send(self, recipients: List, **kwargs): if self.config["broadcast_type"] == MessageBroadcastType.static.value: - self.__send_using_configuration(recipients) + await self.__send_using_configuration(recipients) else: self.__send_using_pyscript() @@ -57,7 +58,7 @@ def __send_using_pyscript(self): client = self.__get_db_client() def send_msg(template_id: Text, recipient, language_code: Text = "en", components: Dict = None, namespace: Text = None): - response = channel_client.send_template_message(template_id, recipient, language_code, components, namespace) + response = asyncio.run(channel_client.send_template_message(template_id, recipient, language_code, components, namespace)) status = "Failed" if response.get("error") else "Success" raw_template = self.__get_template(template_id, language_code) @@ -87,7 +88,7 @@ def log(**kwargs): **script_variables ) - def __send_using_configuration(self, recipients: List): + async def __send_using_configuration(self, recipients: List): channel_client = self.__get_client() total = len(recipients) db_client = self.__get_db_client() @@ -111,7 +112,7 @@ def __send_using_configuration(self, recipients: List): for recipient, t_params in zip(recipients, template_params): recipient = str(recipient) if recipient else "" if not Utility.check_empty_string(recipient): - response = channel_client.send_template_message(template_id, recipient, lang, t_params, namespace=namespace) + response = await channel_client.send_template_message(template_id, recipient, lang, t_params, namespace=namespace) status = "Failed" if response.get("errors") else "Success" if status == "Failed": failure_cnt = failure_cnt + 1 diff --git a/tests/integration_test/chat_service_test.py b/tests/integration_test/chat_service_test.py index 94341df3d..bd0b38be0 100644 --- a/tests/integration_test/chat_service_test.py +++ b/tests/integration_test/chat_service_test.py @@ -1280,13 +1280,15 @@ def _mock_validate_hub_signature(*args, **kwargs): assert actual == 'not validated' -@responses.activate -def test_whatsapp_valid_text_message_request(): +def test_whatsapp_valid_text_message_request(aioresponses): def _mock_validate_hub_signature(*args, **kwargs): return True - responses.add( - "POST", "https://graph.facebook.com/v13.0/12345678/messages", json={} + aioresponses.add( + method=responses.POST, + url="https://graph.facebook.com/v13.0/12345678/messages", + body="success", + status=200 ) with mock.patch.object(EndpointConfig, "request") as mock_action_execution: mock_action_execution.return_value = {"responses": [{"response": "Welcome to kairon!"}], "events": []} @@ -1335,14 +1337,16 @@ def _mock_validate_hub_signature(*args, **kwargs): channel_type="whatsapp") > 0 -@responses.activate @mock.patch("kairon.chat.handlers.channels.whatsapp.Whatsapp.process_message", autospec=True) -def test_whatsapp_exception_when_try_to_handle_webhook_for_whatsapp_message(mock_process_message): +def test_whatsapp_exception_when_try_to_handle_webhook_for_whatsapp_message(mock_process_message, aioresponses): def _mock_validate_hub_signature(*args, **kwargs): return True - responses.add( - "POST", "https://graph.facebook.com/v13.0/12345678/messages", json={} + aioresponses.add( + method=responses.POST, + url="https://graph.facebook.com/v13.0/12345678/messages", + body="success", + status=200 ) mock_process_message.side_effect = Exception with patch.object(MessengerHandler, "validate_hub_signature", _mock_validate_hub_signature): @@ -1384,15 +1388,16 @@ def _mock_validate_hub_signature(*args, **kwargs): assert actual == 'success' -@responses.activate -def test_whatsapp_valid_button_message_request(): +def test_whatsapp_valid_button_message_request(aioresponses): def _mock_validate_hub_signature(*args, **kwargs): return True - responses.add( - "POST", "https://graph.facebook.com/v13.0/12345678/messages", json={} + aioresponses.add( + method=responses.POST, + url="https://graph.facebook.com/v13.0/12345678/messages", + body="success", + status=200 ) - with patch.object(MessengerHandler, "validate_hub_signature", _mock_validate_hub_signature): with mock.patch("kairon.chat.handlers.channels.whatsapp.Whatsapp._handle_user_message", autospec=True) as whatsapp_msg_handler: @@ -1447,21 +1452,25 @@ def _mock_validate_hub_signature(*args, **kwargs): assert whatsapp_msg_handler.call_args[0][4] == bot -@responses.activate -def test_whatsapp_valid_attachment_message_request(): +def test_whatsapp_valid_attachment_message_request(aioresponses): def _mock_validate_hub_signature(*args, **kwargs): return True responses.reset() - responses.add( - "POST", "https://graph.facebook.com/v13.0/12345678/messages", json={} + aioresponses.add( + method=responses.POST, + url="https://graph.facebook.com/v13.0/12345678/messages", + body="success", + status=200 ) - responses.add( - "GET", "https://graph.facebook.com/v13.0/sdfghj567", - json={ + aioresponses.add( + method=responses.GET, + url="https://graph.facebook.com/v13.0/sdfghj567", + body={ "messaging_product": "whatsapp", "url": "http://kairon-media.url", "id": "sdfghj567" - } + }, + status=200 ) with patch.object(MessengerHandler, "validate_hub_signature", _mock_validate_hub_signature): @@ -1843,15 +1852,16 @@ def _mock_validate_hub_signature(*args, **kwargs): }] -@responses.activate -def test_whatsapp_valid_unsupported_message_request(): +def test_whatsapp_valid_unsupported_message_request(aioresponses): def _mock_validate_hub_signature(*args, **kwargs): return True - responses.add( - "POST", "https://graph.facebook.com/v13.0/12345678/messages", json={} + aioresponses.add( + method=responses.POST, + url="https://graph.facebook.com/v13.0/12345678/messages", + body="success", + status=200 ) - with patch.object(MessengerHandler, "validate_hub_signature", _mock_validate_hub_signature): response = client.post( f"/api/bot/whatsapp/{bot}/{token}", @@ -1891,13 +1901,18 @@ def _mock_validate_hub_signature(*args, **kwargs): assert actual == 'success' -@responses.activate -def test_whatsapp_bsp_valid_text_message_request(): - responses.add( - "POST", "https://waba-v2.360dialog.io/v1/messages", json={} +def test_whatsapp_bsp_valid_text_message_request(aioresponses): + aioresponses.add( + method=responses.POST, + url="https://waba-v2.360dialog.io/v1/messages", + body="success", + status=200 ) - responses.add( - "PUT", 'https://waba-v2.360dialog.io/v1/messages/ABEGkZZXBVAiAhAJeqFQ3Yfld16XGKKsgUYK', json={} + aioresponses.add( + method=responses.PUT, + url="https://waba-v2.360dialog.io/v1/messages/ABEGkZZXBVAiAhAJeqFQ3Yfld16XGKKsgUYK", + body="success", + status=200 ) response = client.post( f"/api/bot/whatsapp/{bot2}/{token}", @@ -1934,16 +1949,20 @@ def test_whatsapp_bsp_valid_text_message_request(): }) actual = response.json() assert actual == 'success' - responses.reset() -@responses.activate -def test_whatsapp_bsp_valid_button_message_request(): - responses.add( - "POST", "https://waba-v2.360dialog.io/v1/messages", json={} +def test_whatsapp_bsp_valid_button_message_request(aioresponses): + aioresponses.add( + method=responses.POST, + url="https://waba-v2.360dialog.io/v1/messages", + body="success", + status=200 ) - responses.add( - "PUT", 'https://waba-v2.360dialog.io/v1/messages/ABEGkZZXBVAiAhAJeqFQ3Yfld16XGKKsgUYK', json={} + aioresponses.add( + method=responses.PUT, + url="https://waba-v2.360dialog.io/v1/messages/ABEGkZZXBVAiAhAJeqFQ3Yfld16XGKKsgUYK", + body="success", + status=200 ) response = client.post( f"/api/bot/whatsapp/{bot2}/{token}", @@ -1981,18 +2000,21 @@ def test_whatsapp_bsp_valid_button_message_request(): }) actual = response.json() assert actual == 'success' - responses.reset() -@responses.activate -def test_whatsapp_bsp_valid_attachment_message_request(): - responses.add( - "POST", "https://waba-v2.360dialog.io/v1/messages", json={} +def test_whatsapp_bsp_valid_attachment_message_request(aioresponses): + aioresponses.add( + method=responses.POST, + url="https://waba-v2.360dialog.io/v1/messages", + body="success", + status=200 ) - responses.add( - "PUT", 'https://waba-v2.360dialog.io/v1/messages/ABEGkZZXBVAiAhAJeqFQ3Yfld16XGKKsgUYK', json={} + aioresponses.add( + method=responses.PUT, + url="https://waba-v2.360dialog.io/v1/messages/ABEGkZZXBVAiAhAJeqFQ3Yfld16XGKKsgUYK", + body="success", + status=200 ) - response = client.post( f"/api/bot/whatsapp/{bot2}/{token}", headers={"hub.verify_token": "valid"}, @@ -2027,15 +2049,15 @@ def test_whatsapp_bsp_valid_attachment_message_request(): }) actual = response.json() assert actual == 'success' - responses.reset() -@responses.activate -def test_whatsapp_bsp_valid_order_message_request(): - responses.add( - "POST", "https://waba-v2.360dialog.io/messages", json={} +def test_whatsapp_bsp_valid_order_message_request(aioresponses): + aioresponses.add( + method=responses.POST, + url="https://waba-v2.360dialog.io/messages", + body="success", + status=200 ) - response = client.post( f"/api/bot/whatsapp/{bot2}/{token}", headers={"hub.verify_token": "valid"}, @@ -2083,7 +2105,6 @@ def test_whatsapp_bsp_valid_order_message_request(): }) actual = response.json() assert actual == 'success' - responses.reset() def add_live_agent_config(bot_id, email): diff --git a/tests/unit_test/channel_client_test.py b/tests/unit_test/channel_client_test.py index a6d4942ae..4c352483e 100644 --- a/tests/unit_test/channel_client_test.py +++ b/tests/unit_test/channel_client_test.py @@ -3,7 +3,6 @@ from unittest.mock import patch import pytest -import responses.matchers from kairon import Utility from kairon.chat.handlers.channels.clients.whatsapp.cloud import WhatsappCloud @@ -21,73 +20,91 @@ def whatsapp_on_premise(self): whatsapp_on_premise = WhatsappOnPremise(access_token=access_token, from_phone_number_id=from_phone_number_id) yield whatsapp_on_premise - def test_send_action(self, whatsapp_on_premise): - with mock.patch.object(whatsapp_on_premise.session, 'post') as mock_post: - mock_post.return_value.json.return_value = {"messages": [{"id": "test_id"}]} - response = whatsapp_on_premise.send_action(payload={"text": "Hi"}) - mock_post.assert_called_once_with( + @pytest.mark.asyncio + async def test_send_action(self, whatsapp_on_premise): + with mock.patch.object(whatsapp_on_premise.client, 'request') as mock_post: + mock_post.return_value = {"messages": [{"id": "test_id"}]} + response = await whatsapp_on_premise.send_action(payload={"text": "Hi"}) + mock_post.assert_awaited_once_with( + 'POST', 'https://graph.facebook.com/v13.0/messages', headers=whatsapp_on_premise.auth_args, - json={'text': 'Hi'}, timeout=None - ) + request_body={'text': 'Hi'}, timeout=None, return_json=False, is_streaming_resp=False, + max_retries=3) assert response == {"messages": [{"id": "test_id"}]} - def test_send_action_failure(self, whatsapp_on_premise): - with mock.patch.object(whatsapp_on_premise.session, 'post') as mock_post: - mock_post.return_value.json.return_value = {"error": {"message": "Message Undeliverable", "code": 400}} - response = whatsapp_on_premise.send_action(payload={"text": " "}) + @pytest.mark.asyncio + async def test_send_action_failure(self, whatsapp_on_premise): + with mock.patch.object(whatsapp_on_premise.client, 'request') as mock_post: + mock_post.return_value = {"error": {"message": "Message Undeliverable", "code": 400}} + response = await whatsapp_on_premise.send_action(payload={"text": " "}) mock_post.assert_called_once_with( + 'POST', 'https://graph.facebook.com/v13.0/messages', headers=whatsapp_on_premise.auth_args, - json={'text': ' '}, timeout=None + request_body={'text': ' '}, timeout=None, return_json=False, is_streaming_resp=False, + max_retries=3 ) assert response == {"error": {"message": "Message Undeliverable", "code": 400}} - def test_get_attachment(self, whatsapp_on_premise): - with mock.patch.object(whatsapp_on_premise.session, 'get') as mock_get: - mock_get.return_value.json.return_value = {"type": "document", "media_id": "test_media_id"} - response = whatsapp_on_premise.get_attachment(media_id="test_media_id") - mock_get.assert_called_once_with( + @pytest.mark.asyncio + async def test_get_attachment(self, whatsapp_on_premise): + with mock.patch.object(whatsapp_on_premise.client, 'request') as mock_get: + mock_get.return_value = {"type": "document", "media_id": "test_media_id"} + response = await whatsapp_on_premise.get_attachment(media_id="test_media_id") + mock_get.assert_awaited_once_with( + 'GET', 'https://graph.facebook.com/v13.0/media/test_media_id', headers=whatsapp_on_premise.auth_args, - timeout=None + timeout=None, return_json=False, + max_retries=3 ) assert response == {"type": "document", "media_id": "test_media_id"} - def test_get_attachment_failure(self, whatsapp_on_premise): - with mock.patch.object(whatsapp_on_premise.session, 'get') as mock_get: - mock_get.return_value.json.return_value = {"error": {"message": "media_id is not valid", "code": 400}} - response = whatsapp_on_premise.get_attachment(media_id="invalid_id") - mock_get.assert_called_once_with( + @pytest.mark.asyncio + async def test_get_attachment_failure(self, whatsapp_on_premise): + with mock.patch.object(whatsapp_on_premise.client, 'request') as mock_get: + mock_get.return_value = {"error": {"message": "media_id is not valid", "code": 400}} + response = await whatsapp_on_premise.get_attachment(media_id="invalid_id") + mock_get.assert_awaited_once_with( + 'GET', 'https://graph.facebook.com/v13.0/media/invalid_id', headers=whatsapp_on_premise.auth_args, - timeout=None + timeout=None, return_json=False, + max_retries=3 ) assert response == {"error": {"message": "media_id is not valid", "code": 400}} - def test_mark_as_read(self, whatsapp_on_premise): - with mock.patch.object(whatsapp_on_premise.session, 'put') as mock_put: - mock_put.return_value.json.return_value = {"id": "test_msg_id"} - response = whatsapp_on_premise.mark_as_read(msg_id="test_msg_id") - mock_put.assert_called_once_with( + @pytest.mark.asyncio + async def test_mark_as_read(self, whatsapp_on_premise): + with mock.patch.object(whatsapp_on_premise.client, 'request') as mock_put: + mock_put.return_value = {"id": "test_msg_id"} + response = await whatsapp_on_premise.mark_as_read(msg_id="test_msg_id") + mock_put.assert_awaited_once_with( + 'PUT', 'https://graph.facebook.com/v13.0/messages/test_msg_id', headers=whatsapp_on_premise.auth_args, - json={'status': 'read'}, timeout=None + request_body={'status': 'read'}, timeout=None, return_json=False, + max_retries=3 ) assert response == {"id": "test_msg_id"} - def test_mark_as_read_failure(self, whatsapp_on_premise): - with mock.patch.object(whatsapp_on_premise.session, 'put') as mock_put: - mock_put.return_value.json.return_value = {"error": {"message": "msg_id is not valid", "code": 400}} - response = whatsapp_on_premise.mark_as_read(msg_id="invalid_id") - mock_put.assert_called_once_with( + @pytest.mark.asyncio + async def test_mark_as_read_failure(self, whatsapp_on_premise): + with mock.patch.object(whatsapp_on_premise.client, 'request') as mock_put: + mock_put.return_value = {"error": {"message": "msg_id is not valid", "code": 400}} + response = await whatsapp_on_premise.mark_as_read(msg_id="invalid_id") + mock_put.assert_awaited_once_with( + "PUT", 'https://graph.facebook.com/v13.0/messages/invalid_id', headers=whatsapp_on_premise.auth_args, - json={'status': 'read'}, timeout=None + request_body={'status': 'read'}, timeout=None, return_json=False, + max_retries=3 ) assert response == {"error": {"message": "msg_id is not valid", "code": 400}} - def test_send_template_message(self, whatsapp_on_premise): + @pytest.mark.asyncio + async def test_send_template_message(self, whatsapp_on_premise): namespace = "test_namespace" name = "test_template_name" to_phone_number = "9876543210" @@ -95,7 +112,7 @@ def test_send_template_message(self, whatsapp_on_premise): autospec=True) as mock_send: mock_send.return_value = { "contacts": [{"input": "+55123456789", "status": "valid", "wa_id": "55123456789"}]} - response = whatsapp_on_premise.send_template_message(namespace=namespace, name=name, + response = await whatsapp_on_premise.send_template_message(namespace=namespace, name=name, to_phone_number=to_phone_number) assert response == {"contacts": [{"input": "+55123456789", "status": "valid", "wa_id": "55123456789"}]} assert mock_send.call_args[0][1] == {'language': {'code': 'en', 'policy': 'deterministic'}, @@ -103,14 +120,15 @@ def test_send_template_message(self, whatsapp_on_premise): assert mock_send.call_args[0][2] == '9876543210' assert mock_send.call_args[1] == {'messaging_type': 'template'} - def test_send_template_message_failure(self, whatsapp_on_premise): + @pytest.mark.asyncio + async def test_send_template_message_failure(self, whatsapp_on_premise): namespace = "test_namespace" name = "test_template_name" to_phone_number = "invalid_ph_no" with patch("kairon.chat.handlers.channels.clients.whatsapp.on_premise.WhatsappOnPremise.send", autospec=True) as mock_send: mock_send.return_value = {"error": {"message": "to_phone_number is not valid", "code": 400}} - response = whatsapp_on_premise.send_template_message(namespace=namespace, name=name, + response = await whatsapp_on_premise.send_template_message(namespace=namespace, name=name, to_phone_number=to_phone_number) assert response == {"error": {"message": "to_phone_number is not valid", "code": 400}} assert mock_send.call_args[0][1] == {'language': {'code': 'en', 'policy': 'deterministic'}, @@ -118,7 +136,8 @@ def test_send_template_message_failure(self, whatsapp_on_premise): assert mock_send.call_args[0][2] == 'invalid_ph_no' assert mock_send.call_args[1] == {'messaging_type': 'template'} - def test_send_template_message_without_namespace(self, whatsapp_on_premise): + @pytest.mark.asyncio + async def test_send_template_message_without_namespace(self, whatsapp_on_premise): name = "test_template_name" to_phone_number = "invalid_ph_no" with patch("kairon.chat.handlers.channels.clients.whatsapp.on_premise.WhatsappOnPremise.send", @@ -126,7 +145,7 @@ def test_send_template_message_without_namespace(self, whatsapp_on_premise): mock_send.return_value = {"error": {"message": "to_phone_number is not valid", "code": 400}} with pytest.raises(AppException, match="namespace is required to send messages using on-premises api!"): - whatsapp_on_premise.send_template_message(name=name, to_phone_number=to_phone_number) + await whatsapp_on_premise.send_template_message(name=name, to_phone_number=to_phone_number) class TestWhatsappCloud: @@ -141,7 +160,8 @@ def whatsapp_cloud(self): whatsapp_cloud = WhatsappCloud(access_token=access_token, from_phone_number_id=from_phone_number_id) yield whatsapp_cloud - def test_whatsapp_cloud_send_template_message(self, whatsapp_cloud): + @pytest.mark.asyncio + async def test_whatsapp_cloud_send_template_message(self, whatsapp_cloud): name = "test_template_name" to_phone_number = "9876543210" components = { @@ -171,7 +191,7 @@ def test_whatsapp_cloud_send_template_message(self, whatsapp_cloud): autospec=True) as mock_send: mock_send.return_value = { "contacts": [{"input": "+55123456789", "status": "valid", "wa_id": "55123456789"}]} - response = whatsapp_cloud.send_template_message(name=name, to_phone_number=to_phone_number, + response = await whatsapp_cloud.send_template_message(name=name, to_phone_number=to_phone_number, components=components) assert mock_send.call_args[0][1] == {'language': {'code': 'en'}, 'name': 'test_template_name', 'components': {'type': 'body', @@ -188,20 +208,22 @@ def test_whatsapp_cloud_send_template_message(self, whatsapp_cloud): assert mock_send.call_args[1] == {'messaging_type': 'template'} assert response == {"contacts": [{"input": "+55123456789", "status": "valid", "wa_id": "55123456789"}]} - def test_whatsapp_cloud_send_template_message_without_payload(self, whatsapp_cloud): + @pytest.mark.asyncio + async def test_whatsapp_cloud_send_template_message_without_payload(self, whatsapp_cloud): name = "test_template_name" to_phone_number = "9876543210" with patch("kairon.chat.handlers.channels.clients.whatsapp.cloud.WhatsappCloud.send", autospec=True) as mock_send: mock_send.return_value = { "contacts": [{"input": "+55123456789", "status": "valid", "wa_id": "55123456789"}]} - response = whatsapp_cloud.send_template_message(name=name, to_phone_number=to_phone_number) + response = await whatsapp_cloud.send_template_message(name=name, to_phone_number=to_phone_number) assert mock_send.call_args[0][1] == {'language': {'code': 'en'}, 'name': 'test_template_name'} assert mock_send.call_args[0][2] == "9876543210" assert mock_send.call_args[1] == {'messaging_type': 'template'} assert response == {"contacts": [{"input": "+55123456789", "status": "valid", "wa_id": "55123456789"}]} - def test_whatsapp_cloud_send_template_message_with_namespace(self, whatsapp_cloud): + @pytest.mark.asyncio + async def test_whatsapp_cloud_send_template_message_with_namespace(self, whatsapp_cloud): namespace = "test_namespace" name = "test_template_name" to_phone_number = "9876543210" @@ -209,24 +231,25 @@ def test_whatsapp_cloud_send_template_message_with_namespace(self, whatsapp_clou autospec=True) as mock_send: mock_send.return_value = { "contacts": [{"input": "+55123456789", "status": "valid", "wa_id": "55123456789"}]} - response = whatsapp_cloud.send_template_message(name=name, to_phone_number=to_phone_number, + response = await whatsapp_cloud.send_template_message(name=name, to_phone_number=to_phone_number, namespace=namespace) assert mock_send.call_args[0][1] == {'language': {'code': 'en'}, 'name': 'test_template_name'} assert mock_send.call_args[0][2] == "9876543210" assert mock_send.call_args[1] == {'messaging_type': 'template'} assert response == {"contacts": [{"input": "+55123456789", "status": "valid", "wa_id": "55123456789"}]} - def test_whatsapp_cloud_send_template_message_failure(self, whatsapp_cloud): + @pytest.mark.asyncio + async def test_whatsapp_cloud_send_template_message_failure(self, whatsapp_cloud): name = "test_template_name" to_phone_number = "invalid_ph_no" with patch("kairon.chat.handlers.channels.clients.whatsapp.cloud.WhatsappCloud.send", autospec=True) as mock_send: mock_send.return_value = {"error": {"message": "to_phone_number is not valid", "code": 400}} - response = whatsapp_cloud.send_template_message(name=name, to_phone_number=to_phone_number) + response = await whatsapp_cloud.send_template_message(name=name, to_phone_number=to_phone_number) assert response == {"error": {"message": "to_phone_number is not valid", "code": 400}} - @responses.activate - def test_whatsapp_cloud_send_template_message_with_360dialog(self): + @pytest.mark.asyncio + async def test_whatsapp_cloud_send_template_message_with_360dialog(self, aioresponses): name = "test_template_name" access_token = "ERTYUIEFDGHGFHJKLFGHJKGHJ" from_phone_number_id = "918958030415" @@ -255,51 +278,40 @@ def test_whatsapp_cloud_send_template_message_with_360dialog(self): ] } - responses.add( - "POST", 'https://waba-v2.360dialog.io/messages', - json={"contacts": [{"input": "+55123456789", "status": "valid", "wa_id": "55123456789"}]}, - match=[responses.matchers.json_params_matcher({'messaging_product': 'whatsapp', - 'recipient_type': 'individual', 'to': '9876543210', - 'type': 'template', - 'template': {'language': {'code': 'en'}, - 'name': 'test_template_name', - 'components': {'type': 'body', - 'parameters': [ - {'type': 'text', - 'text': 'text-string'}, - {'type': 'currency', - 'currency': { - 'fallback_value': 'VALUE', - 'code': 'USD', - 'amount_1000': '1000'}}, - {'type': 'date_time', - 'date_time': { - 'fallback_value': 'DATE'}}]}}}), - responses.matchers.header_matcher({'D360-API-KEY': 'ERTYUIEFDGHGFHJKLFGHJKGHJ'})] + aioresponses.add( + url='https://waba-v2.360dialog.io/messages', + method="POST", + status=200, + headers={'D360-API-KEY': 'ERTYUIEFDGHGFHJKLFGHJKGHJ'}, + payload={"contacts": [{"input": "+55123456789", "status": "valid", "wa_id": "55123456789"}]} ) whatsapp_cloud = BSP360Dialog(access_token=access_token, from_phone_number_id=from_phone_number_id) - response = whatsapp_cloud.send_template_message(name=name, to_phone_number=to_phone_number, + response = await whatsapp_cloud.send_template_message(name=name, to_phone_number=to_phone_number, components=components) - assert response == {"contacts": [{"input": "+55123456789", "status": "valid", "wa_id": "55123456789"}]} + assert list(aioresponses.requests.values())[0][0][1]['headers'] == {'D360-API-KEY': 'ERTYUIEFDGHGFHJKLFGHJKGHJ'} + assert list(aioresponses.requests.values())[0][0][1]['json'] == {'messaging_product': 'whatsapp', 'recipient_type': 'individual', 'to': '9876543210', 'type': 'template', 'template': {'language': {'code': 'en'}, 'name': 'test_template_name', 'components': {'type': 'body', 'parameters': [{'type': 'text', 'text': 'text-string'}, {'type': 'currency', 'currency': {'fallback_value': 'VALUE', 'code': 'USD', 'amount_1000': '1000'}}, {'type': 'date_time', 'date_time': {'fallback_value': 'DATE'}}]}}} + resp = await response.text() + assert resp == '{"contacts": [{"input": "+55123456789", "status": "valid", "wa_id": "55123456789"}]}' - @responses.activate - def test_whatsapp_cloud_mark_read_360dialog(self): + @pytest.mark.asyncio + async def test_whatsapp_cloud_mark_read_360dialog(self, aioresponses): access_token = "ERTYUIEFDGHGFHJKLFGHJKGHJ" from_phone_number_id = "918958030415" - - responses.add( - "POST", 'https://waba-v2.360dialog.io/messages', - json={"success": True}, - match=[responses.matchers.json_params_matcher({"messaging_product": "whatsapp", "status": "read", - "message_id": "ASDFHJKJT"}), - responses.matchers.header_matcher({'D360-API-KEY': 'ERTYUIEFDGHGFHJKLFGHJKGHJ'})] + aioresponses.add( + url='https://waba-v2.360dialog.io/messages', + method="POST", + status=200, + headers={'D360-API-KEY': 'ERTYUIEFDGHGFHJKLFGHJKGHJ'}, + payload={"success": True} ) whatsapp_cloud = BSP360Dialog(access_token=access_token, from_phone_number_id=from_phone_number_id) - response = whatsapp_cloud.mark_as_read("ASDFHJKJT") - assert response == {"success": True} + response = await whatsapp_cloud.mark_as_read("ASDFHJKJT") + assert list(aioresponses.requests.values())[0][0][1]['json'] == {'messaging_product': 'whatsapp', 'status': 'read', 'message_id': 'ASDFHJKJT'} + resp = await response.text() + assert resp == '{"success": true}' - @responses.activate - def test_whatsapp_cloud_send_template_message_with_360dialog_failure(self): + @pytest.mark.asyncio + async def test_whatsapp_cloud_send_template_message_with_360dialog_failure(self, aioresponses): name = "test_template_name" access_token = "ERTYUIEFDGHGFHJKLFGHJKGHJ" from_phone_number_id = "918958030415" @@ -341,10 +353,16 @@ def test_whatsapp_cloud_send_template_message_with_360dialog_failure(self): ] } - responses.add( - "POST", 'https://waba-v2.360dialog.io/messages', - json=error_msg, status=404, - match=[responses.matchers.json_params_matcher({'messaging_product': 'whatsapp', + aioresponses.add( + url='https://waba-v2.360dialog.io/messages', + method="POST", + headers={'D360-API-KEY': 'ERTYUIEFDGHGFHJKLFGHJKGHJ'}, + payload=error_msg + ) + whatsapp_cloud = BSP360Dialog(access_token=access_token, from_phone_number_id=from_phone_number_id) + response = await whatsapp_cloud.send_template_message(name=name, to_phone_number=to_phone_number, + components=components) + assert list(aioresponses.requests.values())[0][0][1]['json'] == {'messaging_product': 'whatsapp', 'recipient_type': 'individual', 'to': '9876543210', 'type': 'template', 'template': {'language': {'code': 'en'}, @@ -360,16 +378,13 @@ def test_whatsapp_cloud_send_template_message_with_360dialog_failure(self): 'amount_1000': '1000'}}, {'type': 'date_time', 'date_time': { - 'fallback_value': 'DATE'}}]}}}), - responses.matchers.header_matcher({'D360-API-KEY': 'ERTYUIEFDGHGFHJKLFGHJKGHJ'})] - ) - whatsapp_cloud = BSP360Dialog(access_token=access_token, from_phone_number_id=from_phone_number_id) - response = whatsapp_cloud.send_template_message(name=name, to_phone_number=to_phone_number, - components=components) - assert response == error_msg + 'fallback_value': 'DATE'}}]}}} + resp = await response.text() + print(resp) + assert resp == '{"error": {"message": "(#131009) Parameter value is not valid", "type": "OAuthException", "code": 131009, "error_data": {"messaging_product": "whatsapp", "details": "Please check the parameters you have provided."}, "error_subcode": 2494010, "fbtrace_id": "A_lIoKUKB2unS85jgB4Gl7B"}}' - @responses.activate - def test_whatsapp_cloud_mark_read_360dialog_failure(self): + @pytest.mark.asyncio + async def test_whatsapp_cloud_mark_read_360dialog_failure(self, aioresponses): access_token = "ERTYUIEFDGHGFHJKLFGHJKGHJ" from_phone_number_id = "918958030415" error_msg = { @@ -386,19 +401,22 @@ def test_whatsapp_cloud_mark_read_360dialog_failure(self): } } - responses.add( - "POST", 'https://waba-v2.360dialog.io/messages', - json=error_msg, status=404, - match=[responses.matchers.json_params_matcher({"messaging_product": "whatsapp", "status": "read", - "message_id": "ASDFHJKJT"}), - responses.matchers.header_matcher({'D360-API-KEY': 'ERTYUIEFDGHGFHJKLFGHJKGHJ'})] + aioresponses.add( + url='https://waba-v2.360dialog.io/messages', + method="POST", + headers={'D360-API-KEY': 'ERTYUIEFDGHGFHJKLFGHJKGHJ'}, + payload=error_msg ) whatsapp_cloud = BSP360Dialog(access_token=access_token, from_phone_number_id=from_phone_number_id) - response = whatsapp_cloud.mark_as_read("ASDFHJKJT") - assert response == error_msg - - @responses.activate - def test_whatsapp_cloud_send_message_with_360dialog(self): + response = await whatsapp_cloud.mark_as_read("ASDFHJKJT") + assert list(aioresponses.requests.values())[0][0][1]['json'] == {'messaging_product': 'whatsapp', + 'status': 'read', 'message_id': 'ASDFHJKJT'} + resp = await response.text() + print(resp) + assert resp == '{"error": {"message": "(#131009) Parameter value is not valid", "type": "OAuthException", "code": 131009, "error_data": {"messaging_product": "whatsapp", "details": "Please check the message ID you have provided."}, "error_subcode": 2494010, "fbtrace_id": "A_lIoKUKB2unS85jgB4Gl7B"}}' + + @pytest.mark.asyncio + async def test_whatsapp_cloud_send_message_with_360dialog(self, aioresponses): access_token = "ERTYUIEFDGHGFHJKLFGHJKGHJ" from_phone_number_id = "918958030415" to_phone_number = "9876543210" @@ -407,24 +425,27 @@ def test_whatsapp_cloud_send_message_with_360dialog(self): "body": "You have to check out this amazing messaging service https://www.whatsapp.com/" } - responses.add( - "POST", 'https://waba-v2.360dialog.io/messages', - json={"contacts": [{"input": "+55123456789", "status": "valid", "wa_id": "55123456789"}]}, - match=[responses.matchers.json_params_matcher({ + aioresponses.add( + url='https://waba-v2.360dialog.io/messages', + method="POST", + status=200, + headers={'D360-API-KEY': 'ERTYUIEFDGHGFHJKLFGHJKGHJ'}, + payload={"contacts": [{"input": "+55123456789", "status": "valid", "wa_id": "55123456789"}]} + ) + whatsapp_cloud = BSP360Dialog(access_token=access_token, from_phone_number_id=from_phone_number_id) + response = await whatsapp_cloud.send(payload, to_phone_number, "text") + assert list(aioresponses.requests.values())[0][0][1]['json'] == { 'messaging_product': "whatsapp", 'recipient_type': "individual", "to": to_phone_number, "type": "text", "text": payload - }), - responses.matchers.header_matcher({'D360-API-KEY': 'ERTYUIEFDGHGFHJKLFGHJKGHJ'})] - ) - whatsapp_cloud = BSP360Dialog(access_token=access_token, from_phone_number_id=from_phone_number_id) - response = whatsapp_cloud.send(payload, to_phone_number, "text") - assert response == {"contacts": [{"input": "+55123456789", "status": "valid", "wa_id": "55123456789"}]} + } + resp = await response.text() + assert resp == '{"contacts": [{"input": "+55123456789", "status": "valid", "wa_id": "55123456789"}]}' - @responses.activate - def test_whatsapp_cloud_send_message_with_360dialog_failure(self): + @pytest.mark.asyncio + async def test_whatsapp_cloud_send_message_with_360dialog_failure(self, aioresponses): access_token = "ERTYUIEFDGHGFHJKLFGHJKGHJ" from_phone_number_id = "918958030415" to_phone_number = "9876543210" @@ -446,18 +467,21 @@ def test_whatsapp_cloud_send_message_with_360dialog_failure(self): } } - responses.add( - "POST", 'https://waba-v2.360dialog.io/messages', status=404, - json=error_msg, - match=[responses.matchers.json_params_matcher({ + aioresponses.add( + url='https://waba-v2.360dialog.io/messages', + method="POST", + headers={'D360-API-KEY': 'ERTYUIEFDGHGFHJKLFGHJKGHJ'}, + payload=error_msg + ) + whatsapp_cloud = BSP360Dialog(access_token=access_token, from_phone_number_id=from_phone_number_id) + response = await whatsapp_cloud.send(payload, to_phone_number, "text") + assert list(aioresponses.requests.values())[0][0][1]['json'] == { 'messaging_product': "whatsapp", 'recipient_type': "individual", "to": to_phone_number, "type": "text", "text": payload - }), - responses.matchers.header_matcher({'D360-API-KEY': 'ERTYUIEFDGHGFHJKLFGHJKGHJ'})] - ) - whatsapp_cloud = BSP360Dialog(access_token=access_token, from_phone_number_id=from_phone_number_id) - response = whatsapp_cloud.send(payload, to_phone_number, "text") - assert response == error_msg + } + resp = await response.text() + print(resp) + assert resp == '{"error": {"message": "(#131009) Parameter value is not valid", "type": "OAuthException", "code": 131009, "error_data": {"messaging_product": "whatsapp", "details": "Please check the message ID you have provided."}, "error_subcode": 2494010, "fbtrace_id": "A_lIoKUKB2unS85jgB4Gl7B"}}' diff --git a/tests/unit_test/events/events_test.py b/tests/unit_test/events/events_test.py index 85dd0d3bf..e88d79277 100644 --- a/tests/unit_test/events/events_test.py +++ b/tests/unit_test/events/events_test.py @@ -1187,6 +1187,7 @@ def test_trigger_history_deletion_for_bot(self): assert not logs[0].get('end_timestamp') assert logs[0]['status'] == EVENT_STATUS.ENQUEUED.value + @pytest.mark.asyncio @responses.activate @mongomock.patch(servers=(('localhost', 27017),)) @patch("kairon.shared.channels.whatsapp.bsp.dialog360.BSP360Dialog.get_partner_auth_token", autospec=True) @@ -1194,7 +1195,7 @@ def test_trigger_history_deletion_for_bot(self): @patch("kairon.shared.data.processor.MongoProcessor.get_bot_settings") @patch("kairon.shared.chat.processor.ChatDataProcessor.get_channel_config") @patch("kairon.shared.utils.Utility.is_exist", autospec=True) - def test_execute_message_broadcast_with_logs_modification(self, mock_is_exist, mock_channel_config, + async def test_execute_message_broadcast_with_logs_modification(self, mock_is_exist, mock_channel_config, mock_get_bot_settings, mock_send, mock_get_partner_auth_token): bot = 'test_execute_message_broadcast_with_logs_modification' @@ -1284,7 +1285,7 @@ def test_execute_message_broadcast_with_logs_modification(self, mock_is_exist, m event = MessageBroadcastEvent(bot, user) event.validate() event_id = event.enqueue(EventRequestType.trigger_async.value, config=config) - event.execute(event_id) + result = await event.execute(event_id) logs = MessageBroadcastProcessor.get_broadcast_logs(bot) assert len(logs[0]) == logs[1] == 2 @@ -1312,6 +1313,7 @@ def test_execute_message_broadcast_with_logs_modification(self, mock_is_exist, m result = MessageBroadcastProcessor.get_channel_metrics(ChannelTypes.WHATSAPP.value, bot) assert result == [{'status': 'Failed', 'campaign_id': reference_id, 'count': 1}] + @pytest.mark.asyncio @responses.activate @mongomock.patch(servers=(('localhost', 27017),)) @patch("kairon.shared.channels.whatsapp.bsp.dialog360.BSP360Dialog.get_partner_auth_token", autospec=True) @@ -1319,7 +1321,7 @@ def test_execute_message_broadcast_with_logs_modification(self, mock_is_exist, m @patch("kairon.shared.data.processor.MongoProcessor.get_bot_settings") @patch("kairon.shared.chat.processor.ChatDataProcessor.get_channel_config") @patch("kairon.shared.utils.Utility.is_exist", autospec=True) - def test_execute_message_broadcast_with_static_values(self, mock_is_exist, mock_channel_config, + async def test_execute_message_broadcast_with_static_values(self, mock_is_exist, mock_channel_config, mock_get_bot_settings, mock_send, mock_get_partner_auth_token): bot = 'test_execute_message_broadcast' @@ -1386,7 +1388,7 @@ def test_execute_message_broadcast_with_static_values(self, mock_is_exist, mock_ event = MessageBroadcastEvent(bot, user) event.validate() event_id = event.enqueue(EventRequestType.trigger_async.value, config=config) - event.execute(event_id) + result = await event.execute(event_id) logs = MessageBroadcastProcessor.get_broadcast_logs(bot) assert len(logs[0]) == logs[1] == 2 @@ -1417,13 +1419,14 @@ def test_execute_message_broadcast_with_static_values(self, mock_is_exist, mock_ assert len(settings) == 1 assert settings[0]["status"] == False + @pytest.mark.asyncio @mongomock.patch(servers=(('localhost', 27017),)) @patch("kairon.shared.channels.whatsapp.bsp.dialog360.BSP360Dialog.get_partner_auth_token", autospec=True) @patch("kairon.chat.handlers.channels.clients.whatsapp.dialog360.BSP360Dialog.send_template_message", autospec=True) @patch("kairon.shared.data.processor.MongoProcessor.get_bot_settings") @patch("kairon.shared.chat.processor.ChatDataProcessor.get_channel_config") @patch("kairon.shared.utils.Utility.is_exist", autospec=True) - def test_execute_message_broadcast_with_dynamic_values(self, mock_is_exist, mock_channel_config, + async def test_execute_message_broadcast_with_dynamic_values(self, mock_is_exist, mock_channel_config, mock_get_bot_settings, mock_send, mock_get_partner_auth_token): bot = 'test_execute_dynamic_message_broadcast' @@ -1496,7 +1499,7 @@ def test_execute_message_broadcast_with_dynamic_values(self, mock_is_exist, mock event = MessageBroadcastEvent(bot, user) event.validate() event_id = event.enqueue(EventRequestType.trigger_async.value, config=config) - event.execute(event_id) + result = await event.execute(event_id) responses.reset() logs = MessageBroadcastProcessor.get_broadcast_logs(bot) @@ -1549,11 +1552,12 @@ def test_execute_message_broadcast_with_dynamic_values(self, mock_is_exist, mock 'link': 'https://drive.google.com/uc?export=download&id=1GXQ43jilSDelRvy1kr3PNNpl1e21dRXm', 'filename': 'Brochure.pdf'}}]}] + @pytest.mark.asyncio @patch("kairon.chat.handlers.channels.clients.whatsapp.dialog360.BSP360Dialog.send_template_message", autospec=True) @patch("kairon.shared.data.processor.MongoProcessor.get_bot_settings") @patch("kairon.shared.chat.processor.ChatDataProcessor.get_channel_config") @patch("kairon.shared.utils.Utility.is_exist", autospec=True) - def test_execute_message_broadcast_with_recipient_evaluation_failure(self, mock_is_exist, mock_channel_config, + async def test_execute_message_broadcast_with_recipient_evaluation_failure(self, mock_is_exist, mock_channel_config, mock_get_bot_settings, mock_send): bot = 'test_execute_dynamic_message_broadcast_recipient_evaluation_failure' user = 'test_user' @@ -1588,7 +1592,7 @@ def test_execute_message_broadcast_with_recipient_evaluation_failure(self, mock_ event = MessageBroadcastEvent(bot, user) event.validate() event_id = event.enqueue(EventRequestType.trigger_async.value, config=config) - event.execute(event_id) + result = await event.execute(event_id) responses.reset() logs = MessageBroadcastProcessor.get_broadcast_logs(bot) @@ -1611,10 +1615,11 @@ def test_execute_message_broadcast_with_recipient_evaluation_failure(self, mock_ with pytest.raises(AppException, match="Notification settings not found!"): MessageBroadcastProcessor.get_settings(event_id, bot) + @pytest.mark.asyncio @patch("kairon.shared.chat.processor.ChatDataProcessor.get_channel_config") @patch("kairon.shared.data.processor.MongoProcessor.get_bot_settings") @patch("kairon.shared.utils.Utility.is_exist", autospec=True) - def test_execute_message_broadcast_expression_evaluation_failure(self, mock_is_exist, mock_get_bot_settings, + async def test_execute_message_broadcast_expression_evaluation_failure(self, mock_is_exist, mock_get_bot_settings, mock_channel_config): bot = 'test_execute_message_broadcast_expression_evaluation_failure' user = 'test_user' @@ -1647,7 +1652,7 @@ def test_execute_message_broadcast_expression_evaluation_failure(self, mock_is_e event = MessageBroadcastEvent(bot, user) event.validate() event_id = event.enqueue(EventRequestType.trigger_async.value, config=config) - event.execute(event_id) + result = await event.execute(event_id) logs = MessageBroadcastProcessor.get_broadcast_logs(bot) assert len(logs[0]) == logs[1] == 1 @@ -1655,10 +1660,11 @@ def test_execute_message_broadcast_expression_evaluation_failure(self, mock_is_e assert exception.startswith('Failed to evaluate template: ') responses.reset() + @pytest.mark.asyncio @patch("kairon.chat.handlers.channels.clients.whatsapp.dialog360.BSP360Dialog.send_template_message", autospec=True) @patch("kairon.shared.data.processor.MongoProcessor.get_bot_settings") @patch("kairon.shared.utils.Utility.is_exist", autospec=True) - def test_execute_message_broadcast_with_channel_deleted(self, mock_is_exist, mock_get_bot_settings, mock_send): + async def test_execute_message_broadcast_with_channel_deleted(self, mock_is_exist, mock_get_bot_settings, mock_send): bot = 'test_execute_message_broadcast_with_channel_deleted' user = 'test_user' config = { @@ -1695,7 +1701,7 @@ def test_execute_message_broadcast_with_channel_deleted(self, mock_is_exist, moc "config": {"access_token": "shjkjhrefdfghjkl", "from_phone_number_id": "918958030415"}} event_id = event.enqueue(EventRequestType.trigger_async.value, config=config) - event.execute(event_id) + result = await event.execute(event_id) responses.reset() logs = MessageBroadcastProcessor.get_broadcast_logs(bot) @@ -1718,6 +1724,7 @@ def test_execute_message_broadcast_with_channel_deleted(self, mock_is_exist, moc with pytest.raises(AppException, match="Notification settings not found!"): MessageBroadcastProcessor.get_settings(event_id, bot) + @pytest.mark.asyncio @responses.activate @mongomock.patch(servers=(('localhost', 27017),)) @patch("kairon.shared.channels.whatsapp.bsp.dialog360.BSP360Dialog.get_partner_auth_token", autospec=True) @@ -1725,7 +1732,7 @@ def test_execute_message_broadcast_with_channel_deleted(self, mock_is_exist, moc @patch("kairon.shared.chat.processor.ChatDataProcessor.get_channel_config") @patch("kairon.chat.handlers.channels.clients.whatsapp.dialog360.BSP360Dialog.send_template_message") @patch("kairon.shared.utils.Utility.is_exist", autospec=True) - def test_execute_message_broadcast_evaluate_template_parameters(self, mock_is_exist, mock_send, mock_channel_config, + async def test_execute_message_broadcast_evaluate_template_parameters(self, mock_is_exist, mock_send, mock_channel_config, mock_get_bot_settings, mock_bsp_auth_token): bot = 'test_execute_message_broadcast_evaluate_template_parameters' user = 'test_user' @@ -1793,7 +1800,7 @@ def test_execute_message_broadcast_evaluate_template_parameters(self, mock_is_ex event = MessageBroadcastEvent(bot, user) event.validate() event_id = event.enqueue(EventRequestType.trigger_async.value, config=config) - event.execute(event_id) + result = await event.execute(event_id) logs = MessageBroadcastProcessor.get_broadcast_logs(bot) @@ -1843,13 +1850,14 @@ def test_base_scheduler_class(self): with pytest.raises(Exception): ScheduledEventsBase("test", "test").enqueue(event_request_type, config={}) + @pytest.mark.asyncio @mongomock.patch(servers=(('localhost', 27017),)) @patch("kairon.shared.channels.whatsapp.bsp.dialog360.BSP360Dialog.get_partner_auth_token", autospec=True) @patch("kairon.chat.handlers.channels.clients.whatsapp.dialog360.BSP360Dialog.send_template_message", autospec=True) @patch("kairon.shared.data.processor.MongoProcessor.get_bot_settings") @patch("kairon.shared.chat.processor.ChatDataProcessor.get_channel_config") @patch("kairon.shared.utils.Utility.is_exist", autospec=True) - def test_execute_message_broadcast_with_pyscript(self, mock_is_exist, mock_channel_config, + async def test_execute_message_broadcast_with_pyscript(self, mock_is_exist, mock_channel_config, mock_get_bot_settings, mock_send, mock_get_partner_auth_token): bot = 'test_execute_message_broadcast_with_pyscript' user = 'test_user' @@ -1927,7 +1935,7 @@ def test_execute_message_broadcast_with_pyscript(self, mock_is_exist, mock_chann event = MessageBroadcastEvent(bot, user) event.validate() event_id = event.enqueue(EventRequestType.trigger_async.value, config=config) - event.execute(event_id) + result = await event.execute(event_id) responses.reset() logs = MessageBroadcastProcessor.get_broadcast_logs(bot) @@ -2000,10 +2008,11 @@ def test_execute_message_broadcast_with_pyscript(self, mock_is_exist, mock_chann # 'filename': 'Brochure.pdf'}}]}] # assert mock_send.call_args[0][5] == '13b1e228_4a08_4d19_a0da_cdb80bc76380' + @pytest.mark.asyncio @patch("kairon.shared.data.processor.MongoProcessor.get_bot_settings") @patch("kairon.shared.chat.processor.ChatDataProcessor.get_channel_config") @patch("kairon.shared.utils.Utility.is_exist", autospec=True) - def test_execute_message_broadcast_with_pyscript_failure(self, mock_is_exist, mock_channel_config, + async def test_execute_message_broadcast_with_pyscript_failure(self, mock_is_exist, mock_channel_config, mock_get_bot_settings): bot = 'test_execute_message_broadcast_with_pyscript_failure' user = 'test_user' @@ -2032,7 +2041,7 @@ def test_execute_message_broadcast_with_pyscript_failure(self, mock_is_exist, mo event = MessageBroadcastEvent(bot, user) event.validate() event_id = event.enqueue(EventRequestType.trigger_async.value, config=config) - event.execute(event_id) + result = await event.execute(event_id) responses.reset() logs = MessageBroadcastProcessor.get_broadcast_logs(bot) @@ -2054,11 +2063,12 @@ def test_execute_message_broadcast_with_pyscript_failure(self, mock_is_exist, mo with pytest.raises(AppException, match="Notification settings not found!"): MessageBroadcastProcessor.get_settings(event_id, bot) + @pytest.mark.asyncio @patch("kairon.shared.channels.broadcast.whatsapp.json") @patch("kairon.shared.data.processor.MongoProcessor.get_bot_settings") @patch("kairon.shared.chat.processor.ChatDataProcessor.get_channel_config") @patch("kairon.shared.utils.Utility.is_exist", autospec=True) - def test_execute_message_broadcast_with_pyscript_timeout(self, mock_is_exist, mock_channel_config, + async def test_execute_message_broadcast_with_pyscript_timeout(self, mock_is_exist, mock_channel_config, mock_get_bot_settings, mock_json): import time @@ -2093,7 +2103,7 @@ def sleep_for_some_time(*args, **kwargs): event = MessageBroadcastEvent(bot, user) event.validate() event_id = event.enqueue(EventRequestType.trigger_async.value, config=config) - event.execute(event_id) + result = await event.execute(event_id) responses.reset() logs = MessageBroadcastProcessor.get_broadcast_logs(bot)