Skip to content

Commit

Permalink
Async Rest client changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
Nupur Khare committed Jan 3, 2024
1 parent 44fc36f commit 729d0a5
Show file tree
Hide file tree
Showing 13 changed files with 602 additions and 398 deletions.
2 changes: 1 addition & 1 deletion kairon/actions/definitions/vector_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def execute(self, dispatcher: CollectingDispatcher, tracker: Tracker, doma
operation_type = vector_action_config['query_type']
payload_type = vector_action_config['payload']
request_body = tracker.get_slot(payload_type.get('value')) if payload_type.get('type') == DbQueryValueType.from_slot.value \
else payload_type.get('value')
else ActionUtility.check_request_body_type(payload_type.get('value'))
msg_logger.append(request_body)
tracker_data = ActionUtility.build_context(tracker, True)
response = vector_db.perform_operation(operation_type, request_body)
Expand Down
54 changes: 26 additions & 28 deletions kairon/chat/handlers/channels/clients/whatsapp/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import logging
from typing import Text, Dict

import requests

from kairon import Utility
from kairon.exceptions import AppException

Expand All @@ -26,21 +24,21 @@ class WhatsappCloud(object):
'template'
}

def __init__(self, access_token, **kwargs):
def __init__(self, access_token, session, **kwargs):
"""
@required:
access_token
@optional:
session
client
api_version
app_secret
"""

self.access_token = access_token
self.session = session
self.from_phone_number_id = kwargs.get('from_phone_number_id')
if self.client_type == "meta" and Utility.check_empty_string(self.from_phone_number_id):
raise AppException("missing parameter 'from_phone_number_id'")
self.session = kwargs.get('session', requests.Session())
self.api_version = kwargs.get('api_version', DEFAULT_API_VERSION)
self.app = 'https://graph.facebook.com/v{api_version}'.format(api_version=self.api_version)
self.app_secret = kwargs.get('app_secret')
Expand All @@ -61,7 +59,7 @@ def auth_args(self):
self._auth_args = auth
return self._auth_args

def send(self, payload, to_phone_number, messaging_type, recipient_type='individual', timeout=None, tag=None):
async def send(self, payload, to_phone_number, messaging_type, recipient_type='individual', timeout=None, tag=None):
"""
@required:
payload: message request payload
Expand All @@ -87,9 +85,10 @@ def send(self, payload, to_phone_number, messaging_type, recipient_type='individ
if tag:
body['tag'] = tag

return self.send_action(body)
response = await self.send_action(body)
return response

def send_json(self, payload: dict, to_phone_number, recipient_type='individual', timeout=None):
async def send_json(self, payload: dict, to_phone_number, recipient_type='individual', timeout=None):
"""
@required:
payload: message request payload
Expand All @@ -105,46 +104,44 @@ def send_json(self, payload: dict, to_phone_number, recipient_type='individual',
"to": to_phone_number
})

return self.send_action(payload)
response = await self.send_action(payload)
return response

def send_action(self, payload, timeout=None, **kwargs):
async def send_action(self, payload, timeout=None, **kwargs):
"""
@required:
payload: message request payload
@optional:
timeout: request timeout
@outputs: response json
"""
r = self.session.post(
'{app}/{from_phone_number_id}/messages'.format(app=self.app, from_phone_number_id=self.from_phone_number_id),
params=self.auth_args,
json=payload,
timeout=timeout
)
resp = r.json()
url = '{app}/{from_phone_number_id}/messages'.format(app=self.app,
from_phone_number_id=self.from_phone_number_id)
response = await self.session.request("POST", url, headers=self.auth_args, request_body=payload, timeout=timeout,
return_json=False)
resp = await response.text()
logger.debug(resp)
return resp

def get_attachment(self, attachment_id, timeout=None):
async def get_attachment(self, attachment_id, timeout=None):
"""
@required:
attachment_id: audio/video/image/document id
@optional:
timeout: request timeout
@outputs: response json
"""
r = self.session.get(
'{app}/{attachment_id}'.format(app=self.app, attachment_id=attachment_id),
params=self.auth_args,
timeout=timeout
)
resp = r.json()
url = '{app}/{attachment_id}'.format(app=self.app, attachment_id=attachment_id)
response = await self.session.request("GET", url, headers=self.auth_args, timeout=timeout,
return_json=False)
resp = await response.text()
logger.debug(resp)
return resp

def mark_as_read(self, msg_id, timeout=None):
async def mark_as_read(self, msg_id, timeout=None):
payload = {"messaging_product": "whatsapp", "status": "read", "message_id": msg_id}
return self.send_action(payload)
response = await self.send_action(payload)
return response

def generate_appsecret_proof(self):
"""
Expand All @@ -157,7 +154,7 @@ def generate_appsecret_proof(self):

return hmac.new(app_secret, access_token, hashlib.sha256).hexdigest()

def send_template_message(self, name: Text, to_phone_number, language_code: Text = "en", components: Dict = None, namespace: Text = None):
async def send_template_message(self, name: Text, to_phone_number, language_code: Text = "en", components: Dict = None, namespace: Text = None):
payload = {
"language": {
"code": language_code
Expand All @@ -166,4 +163,5 @@ def send_template_message(self, name: Text, to_phone_number, language_code: Text
}
if components:
payload.update({"components": components})
return self.send(payload, to_phone_number, messaging_type="template")
response = await self.send(payload, to_phone_number, messaging_type="template")
return response
26 changes: 13 additions & 13 deletions kairon/chat/handlers/channels/clients/whatsapp/dialog360.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from loguru import logger

from kairon import Utility
from kairon.chat.handlers.channels.clients.whatsapp.cloud import WhatsappCloud
from kairon.shared.constants import WhatsappBSPTypes
from loguru import logger


class BSP360Dialog(WhatsappCloud):

def __init__(self, access_token, **kwargs):
super().__init__(access_token, **kwargs)
def __init__(self, access_token, session, **kwargs):
super().__init__(access_token, session, **kwargs)
self.access_token = access_token
self.session = session
self.base_url = Utility.system_metadata["channels"]["whatsapp"]["business_providers"]["360dialog"]["waba_base_url"]
self.auth_header = Utility.system_metadata["channels"]["whatsapp"]["business_providers"]["360dialog"]["auth_header"]
self.app = f'{self.base_url}'
Expand All @@ -23,24 +25,22 @@ def auth_args(self):
self._auth_args = {self.auth_header: self.access_token}
return self._auth_args

def send_action(self, payload, timeout=None, **kwargs):
async def send_action(self, payload, timeout=None, **kwargs):
"""
@required:
payload: message request payload
@optional:
timeout: request timeout
@outputs: response json
"""
r = self.session.post(
'{app}/messages'.format(app=self.app),
headers=self.auth_args,
json=payload,
timeout=timeout
)
resp = r.json()
url = '{app}/messages'.format(app=self.app)
response = await self.session.request("POST", url, headers=self.auth_args, request_body=payload, timeout=timeout,
return_json=False)
resp = await response.text()
logger.debug(resp)
return resp

def mark_as_read(self, msg_id, timeout=None):
async def mark_as_read(self, msg_id, timeout=None):
payload = {"messaging_product": "whatsapp", "status": "read", "message_id": msg_id}
return self.send_action(payload)
response = await self.send_action(payload)
return response
57 changes: 25 additions & 32 deletions kairon/chat/handlers/channels/clients/whatsapp/on_premise.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import logging
from typing import Text, Dict

import requests

from kairon.chat.handlers.channels.clients.whatsapp.cloud import WhatsappCloud
from kairon.exceptions import AppException
from kairon.shared.rest_client import AioRestClient
from kairon.shared.utils import Utility
from kairon.chat.handlers.channels.clients.whatsapp.cloud import WhatsappCloud

logger = logging.getLogger(__name__)

Expand All @@ -14,69 +13,62 @@

class WhatsappOnPremise(WhatsappCloud):

def __init__(self, access_token, **kwargs):
def __init__(self, access_token, session, **kwargs):
"""
@required:
access_token
@optional:
session
client
api_version
app_secret
"""
super().__init__(access_token, **kwargs)
super().__init__(access_token, session, **kwargs)
self.access_token = access_token
self.session = kwargs.get('session', requests.Session())
self.session = session
self.client = kwargs.get('session', AioRestClient(False))

def send_action(self, payload, timeout=None, **kwargs):
async def send_action(self, payload, timeout=None, **kwargs):
"""
@required:
payload: message request payload
@optional:
timeout: request timeout
@outputs: response json
"""
r = self.session.post(
'{app}/messages'.format(app=self.app),
headers=self.auth_args,
json=payload,
timeout=timeout
)
resp = r.json()
url = '{app}/messages'.format(app=self.app)
response = await self.session.request("POST", url, headers=self.auth_args, request_body=payload, timeout=timeout,
return_json=False)
resp = await response.text()
logger.debug(resp)
return resp

def get_attachment(self, media_id, timeout=None):
async def get_attachment(self, media_id, timeout=None):
"""
@required:
media_id: audio/video/image/document id
@optional:
timeout: request timeout
@outputs: response json
"""
r = self.session.get(
'{app}/media/{media_id}'.format(app=self.app, media_id=media_id),
headers=self.auth_args,
timeout=timeout
)
resp = r.json()
url = '{app}/media/{media_id}'.format(app=self.app, media_id=media_id)
response = await self.session.request("GET", url, headers=self.auth_args, timeout=timeout,
return_json=False)
resp = await response.text()
logger.debug(resp)
return resp

def mark_as_read(self, msg_id, timeout=None):
async def mark_as_read(self, msg_id, timeout=None):
payload = {
"status": "read"
}
r = self.session.put(
'{app}/messages/{message_id}'.format(app=self.app, message_id=msg_id),
headers=self.auth_args,
json=payload,
timeout=timeout
)
resp = r.json()
url = '{app}/messages/{message_id}'.format(app=self.app, message_id=msg_id)
response = await self.session.request("PUT", url, headers=self.auth_args, request_body=payload, timeout=timeout,
return_json=False)
resp = await response.text()
logger.debug(resp)
return resp

def send_template_message(self, name: Text, to_phone_number, language_code: Text = "en", components: Dict = None, namespace: Text = None):
async def send_template_message(self, name: Text, to_phone_number, language_code: Text = "en", components: Dict = None, namespace: Text = None):
if Utility.check_empty_string(namespace):
raise AppException("namespace is required to send messages using on-premises api!")

Expand All @@ -90,4 +82,5 @@ def send_template_message(self, name: Text, to_phone_number, language_code: Text
}
if components:
payload.update({"components": components})
return self.send(payload, to_phone_number, messaging_type="template")
response = await self.send(payload, to_phone_number, messaging_type="template")
return response
32 changes: 18 additions & 14 deletions kairon/chat/handlers/channels/msteams.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
from kairon import Utility
from kairon.chat.agent_processor import AgentProcessor
from kairon.chat.converters.channels.response_factory import ConverterFactory
from kairon.chat.converters.channels.responseconverter import ElementTransformerOps
from kairon.chat.handlers.channels.base import ChannelHandlerBase
from kairon.shared.chat.processor import ChatDataProcessor
from kairon.shared.constants import ChannelTypes
from kairon.shared.models import User
from kairon.shared.rest_client import AioRestClient


class MSTeamBot(OutputChannel):
Expand Down Expand Up @@ -50,6 +50,15 @@ def __init__(
self.conversation = conversation
self.global_uri = f"{service_url}v3/"
self.bot = bot
self.client = AioRestClient(False)

async def make_request(self, method, url, request_body=None, headers=None, timeout=None, is_streaming_resp=False):
response = await self.client.request(
method, url, request_body=request_body, headers=headers, timeout=timeout,
return_json=False)
resp = await response.text()
logger.debug(resp)
return resp

async def _get_headers(self, refetch=False) -> Optional[Dict[Text, Any]]:
ms_oauthurl = Utility.system_metadata["channels"][ChannelTypes.MSTEAMS.value]["MICROSOFT_OAUTH2_URL"]
Expand All @@ -65,12 +74,11 @@ async def _get_headers(self, refetch=False) -> Optional[Dict[Text, Any]]:
'scope': scope,
}

token_response = requests.post(uri, data=payload)
token_response = await self.make_request("POST", uri, request_body=payload)

if token_response.ok:
token_data = token_response.json()
access_token = token_data["access_token"]
token_expiration = token_data["expires_in"]
if token_response.status_code == 200:
access_token = token_response["access_token"]
token_expiration = token_response["expires_in"]

delta = datetime.timedelta(seconds=int(token_expiration))
MSTeamBot.token_expiration_date = datetime.datetime.now() + delta
Expand Down Expand Up @@ -104,24 +112,20 @@ async def send(self, message_data: Dict[Text, Any]) -> None:
self.global_uri, self.conversation["id"]
)
headers = await self._get_headers()
send_response = requests.post(
post_message_uri,headers=headers, data=json.dumps(message_data)
)
send_response = await self.make_request("POST", post_message_uri, request_body=json.dumps(message_data), headers=headers)

if send_response.status_code == 403:
headers = await self._get_headers(True)
send_response = requests.post(
post_message_uri, headers=headers, data=json.dumps(message_data)
)
send_response = await self.make_request("POST", post_message_uri, request_body=json.dumps(message_data),
headers=headers)

if not send_response.ok:
if not send_response != 200:
logger.error(
"Error trying to send botframework messge. Response: %s",
send_response.text,
)
raise Exception(f"Exception while responding to MSTeams:: {send_response.text} and status::{send_response.status_code}")


async def send_text_message(
self, recipient_id: Text, text: Text, **kwargs: Any
) -> None:
Expand Down
Loading

0 comments on commit 729d0a5

Please sign in to comment.