Skip to content

Commit

Permalink
Chat server migration (#1008)
Browse files Browse the repository at this point in the history
* 1. added default tabname if not exists in metadata
2. aggregated session conversation by tabname

* 1. returned tabname with chat response
2. added metadata check while retrieving action name in history
3. updated razorpay action to convert amount into int and null checks

* Chat server migration
1. migration to fastapi
2. added lockstore
3. unit and integration tests

* fixed test failure post merge

* fixed test failure post merge

* fixed merge conflicts and added tests

* fixed merge conflicts and added tests
  • Loading branch information
udit-pandey authored Aug 31, 2023
1 parent b561bc4 commit 31729f5
Show file tree
Hide file tree
Showing 40 changed files with 2,859 additions and 2,424 deletions.
3 changes: 2 additions & 1 deletion docker/Dockerfile_chat
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ COPY email.yaml /app/

ENV HF_HOME="/home/cache"

ENV APP_MODULE=kairon.api.server:app
EXPOSE 5000
CMD ["python","-m", "kairon.chat.server","--logging=debug"]
CMD uvicorn ${APP_MODULE} --host 0.0.0.0 --port 5000 --no-server-header

13 changes: 12 additions & 1 deletion kairon/actions/definitions/razorpay.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,15 @@ async def execute(self, dispatcher: CollectingDispatcher, tracker: Tracker, doma
username = action_config.get('username')
email = action_config.get('email')
contact = action_config.get('contact')
body = {}
try:
tracker_data = ActionUtility.build_context(tracker)
api_key = ActionUtility.retrieve_value_for_custom_action_parameter(tracker_data, api_key, self.bot)
api_secret = ActionUtility.retrieve_value_for_custom_action_parameter(tracker_data, api_secret, self.bot)
amount = ActionUtility.retrieve_value_for_custom_action_parameter(tracker_data, amount, self.bot)
if not amount:
raise ActionFailure(f"amount must be a whole number! Got {amount}.")
amount = int(amount)
currency = ActionUtility.retrieve_value_for_custom_action_parameter(tracker_data, currency, self.bot)
username = ActionUtility.retrieve_value_for_custom_action_parameter(tracker_data, username, self.bot)
email = ActionUtility.retrieve_value_for_custom_action_parameter(tracker_data, email, self.bot)
Expand All @@ -78,6 +82,12 @@ async def execute(self, dispatcher: CollectingDispatcher, tracker: Tracker, doma
headers=headers, http_url=ActionRazorpay.__URL, request_method="POST", request_body=body
)
bot_response = http_response["short_url"]
except ValueError as e:
logger.exception(e)
logger.debug(e)
exception = f"amount must be a whole number! Got {amount}."
status = "FAILURE"
bot_response = "I have failed to process your request"
except Exception as e:
logger.exception(e)
logger.debug(e)
Expand All @@ -95,7 +105,8 @@ async def execute(self, dispatcher: CollectingDispatcher, tracker: Tracker, doma
api_response=str(http_response),
bot_response=bot_response,
status=status,
user_msg=tracker.latest_message.get('text')
user_msg=tracker.latest_message.get('text'),
request=body
).save()
dispatcher.utter_message(bot_response)
return {KaironSystemSlots.kairon_action_response.value: bot_response}
16 changes: 9 additions & 7 deletions kairon/api/app/routers/bot/bot.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
from datetime import date, datetime
from typing import List, Optional
from typing import List, Optional, Dict, Text
from fastapi import APIRouter, BackgroundTasks, Path, Security, Request
from fastapi import File, UploadFile
from fastapi.responses import FileResponse
Expand Down Expand Up @@ -1163,12 +1163,14 @@ async def get_chat_client_config_url(
return Response(data=url)


@router.get("/chat/client/config/{uid}", response_model=Response)
async def get_client_config_using_uid(request: Request, bot: str, uid: str):
config = mongo_processor.get_client_config_using_uid(bot, uid)
if not Utility.validate_request(request, config):
return Response(message="Domain not registered for kAIron client", error_code=403, success=False)
config['config'].pop("whitelist")
@router.get("/chat/client/config/{token}", response_model=Response)
async def get_client_config_using_uid(
request: Request, bot: Text = Path(description="Bot id"),
token: Text = Path(description="Token generated from api server"),
token_claims: Dict = Security(Authentication.validate_bot_specific_token, scopes=TESTER_ACCESS)
):
config = mongo_processor.get_client_config_using_uid(bot, token_claims)
config = Utility.validate_domain(request, config)
return Response(data=config['config'])


Expand Down
3 changes: 2 additions & 1 deletion kairon/chat/agent/message_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ async def handle_message(
self, message: UserMessage
):
"""Handle a single message with this processor."""
response = {"nlu": None, "action": None, "response": None, "slots": None, "events": None}
tabname = message.metadata.get("tabname", "default")
response = {"nlu": None, "action": None, "response": None, "slots": None, "events": None, "tabname": tabname}

# preprocess message if necessary
tracker, intent_predictions = await self.log_message(message, should_save_tracker=False)
Expand Down
5 changes: 4 additions & 1 deletion kairon/chat/agent_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from loguru import logger as logging
from rasa.core.agent import Agent
from rasa.core.lock_store import LockStore

from kairon.chat.cache import AgentCache
from kairon.exceptions import AppException
Expand Down Expand Up @@ -45,10 +46,12 @@ def reload(bot: Text):
bot, raise_exception=False
)
action_endpoint = Utility.get_action_url(endpoint)
lock_store_endpoint = LockStore.create(Utility.get_lock_store_url(bot))
model_path = Utility.get_latest_model(bot)
domain = AgentProcessor.mongo_processor.load_domain(bot)
mongo_store = Utility.get_local_mongo_store(bot, domain)
agent = KaironAgent.load(model_path, action_endpoint=action_endpoint, tracker_store=mongo_store)
agent = KaironAgent.load(model_path, action_endpoint=action_endpoint, tracker_store=mongo_store,
lock_store=lock_store_endpoint)
AgentProcessor.cache_provider.set(bot, agent)
except Exception as e:
logging.exception(e)
Expand Down
139 changes: 0 additions & 139 deletions kairon/chat/handlers/action.py

This file was deleted.

10 changes: 10 additions & 0 deletions kairon/chat/handlers/channels/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from abc import ABC


class ChannelHandlerBase(ABC):

async def validate(self):
raise NotImplementedError("Provider not implemented")

async def handle_message(self):
raise NotImplementedError("Provider not implemented")
26 changes: 26 additions & 0 deletions kairon/chat/handlers/channels/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import Text

from kairon.chat.handlers.channels.hangouts import HangoutsHandler
from kairon.chat.handlers.channels.messenger import MessengerHandler, InstagramHandler
from kairon.chat.handlers.channels.msteams import MSTeamsHandler
from kairon.chat.handlers.channels.slack import SlackHandler
from kairon.chat.handlers.channels.telegram import TelegramHandler
from kairon.chat.handlers.channels.whatsapp import WhatsappHandler
from kairon.shared.constants import ChannelTypes


class ChannelHandlerFactory:

__implementations = {
ChannelTypes.WHATSAPP.value: WhatsappHandler,
ChannelTypes.HANGOUTS.value: HangoutsHandler,
ChannelTypes.SLACK.value: SlackHandler,
ChannelTypes.MESSENGER.value: MessengerHandler,
ChannelTypes.MSTEAMS.value: MSTeamsHandler,
ChannelTypes.TELEGRAM.value: TelegramHandler,
ChannelTypes.INSTAGRAM.value: InstagramHandler
}

@staticmethod
def get_handler(channel: Text):
return ChannelHandlerFactory.__implementations[channel]
39 changes: 22 additions & 17 deletions kairon/chat/handlers/channels/hangouts.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
import requests
from google.oauth2 import id_token
from rasa.core.channels.channel import InputChannel, OutputChannel, UserMessage
from tornado.escape import json_encode, json_decode
from starlette.requests import Request
from tornado.escape import json_encode

from kairon.chat.agent_processor import AgentProcessor
from kairon.chat.handlers.channels.base import ChannelHandlerBase
from kairon.shared.chat.processor import ChatDataProcessor
from kairon.shared.constants import ChannelTypes
from kairon.shared.tornado.handlers.base import BaseHandler
from kairon.shared.models import User
from kairon import Utility
from kairon.chat.converters.channels.response_factory import ConverterFactory

Expand Down Expand Up @@ -188,7 +190,7 @@ async def send_custom_json(
message_type = json_message.get("type")
type_list = Utility.system_metadata.get("type_list")
if message_type is not None and message_type in type_list:
converter_instance = ConverterFactory.getConcreteInstance(message_type, ChannelTypes.HANGOUT.value)
converter_instance = ConverterFactory.getConcreteInstance(message_type, ChannelTypes.HANGOUTS.value)
response = await converter_instance.messageConverter(message)
await self._persist_message(response)
else:
Expand All @@ -198,7 +200,7 @@ async def send_custom_json(


# Google Hangouts input channel
class HangoutHandler(InputChannel, BaseHandler):
class HangoutsHandler(InputChannel, ChannelHandlerBase):
"""
Channel that uses Google Hangouts Chat API to communicate.
"""
Expand All @@ -211,6 +213,11 @@ class HangoutHandler(InputChannel, BaseHandler):
session=cached_session
)

def __init__(self, bot: Text, user: User, request: Request):
self.bot = bot
self.user = user
self.request = request

@staticmethod
def _extract_sender(request_data: Dict) -> Text:

Expand Down Expand Up @@ -269,14 +276,13 @@ def _check_token(self, bot_token: Text, project_id: Text) -> None:
if decoded_token["iss"] != "[email protected]":
raise Exception(401)

async def get(self, bot: str, token: str):
self.write(json_encode({"status": "ok"}))
async def validate(self):
return {"status": "ok"}

async def post(self, bot: str, token: str):
user = super().authenticate_channel(token, bot, self.request)
hangout = ChatDataProcessor.get_channel_config("hangouts", bot=bot, mask_characters=False)
async def handle_message(self):
hangout = ChatDataProcessor.get_channel_config("hangouts", bot=self.bot, mask_characters=False)
project_id = hangout['config']['project_id']
request_data = json_decode(self.request.body)
request_data = await self.request.json()
if project_id:
token = self.request.headers.get("Authorization").replace("Bearer ", "")
self._check_token(token, project_id)
Expand All @@ -285,16 +291,16 @@ async def post(self, bot: str, token: str):
room_name = self._extract_room(request_data)
text = self._extract_message(request_data)
if text is None:
self.write("OK")
return
return {"status": "OK"}

input_channel = self._extract_input_channel()

collector = HangoutsOutput()

try:
metadata = {"is_integration_user": True, "bot": bot, "account": user.account, "room": room_name,
"out_channel": collector.name(), "channel_type": "hangouts"}
await AgentProcessor.get_agent(bot).handle_message(UserMessage(
metadata = {"is_integration_user": True, "bot": self.bot, "account": self.user.account, "room": room_name,
"out_channel": collector.name(), "channel_type": "hangouts", "tabname": "default"}
await AgentProcessor.get_agent(self.bot).handle_message(UserMessage(
text,
collector,
sender_id,
Expand All @@ -311,5 +317,4 @@ async def post(self, bot: str, token: str):
f"text: {text}"
)

self.write(json_encode(collector.messages))
return
return json_encode(collector.messages)
Loading

0 comments on commit 31729f5

Please sign in to comment.