Skip to content

Commit

Permalink
Retry Broadcast (#1359)
Browse files Browse the repository at this point in the history
* Added code related to Resend Broadcast.
Fixed and added test cases related to the same.

* Made changes related to Resend Broadcast and added tests related to the same.

* Made changes related to Resend Broadcast and added tests related to the same.

* Made changes related to Resend Broadcast and added tests related to the same.

* Resolved merge conflicts and Fixed tests.

* Added campaign metrics related changes and added test cases for the same.

* Added campaign metrics related changes and added test cases for the same.

* Added campaign metrics related changes and added test cases for the same and changed system.yaml

* Added campaign metrics related changes and added test cases for the same and changed system.yaml
  • Loading branch information
maheshsattala authored Jul 19, 2024
1 parent f3cbc9a commit 2a1b4dd
Show file tree
Hide file tree
Showing 16 changed files with 2,366 additions and 62 deletions.
14 changes: 14 additions & 0 deletions kairon/api/app/routers/bot/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,20 @@ async def add_message_broadcast_event(
return Response(message="Broadcast added!", data={"msg_broadcast_id": notification_id})


@router.post("/broadcast/message/resend/{msg_broadcast_id}", response_model=Response)
async def resend_message_broadcast_event(
msg_broadcast_id: str,
current_user: User = Security(Authentication.get_current_user_and_bot, scopes=DESIGNER_ACCESS)
):
"""
Resends a scheduled message broadcast.
"""
event = MessageBroadcastEvent(current_user.get_bot(), current_user.get_user())
event.validate_retry_broadcast(event_id=msg_broadcast_id)
event.enqueue(EventRequestType.resend_broadcast.value, msg_broadcast_id=msg_broadcast_id)
return Response(message="Resending Broadcast!")


@router.put("/broadcast/message/{msg_broadcast_id}", response_model=Response)
async def update_message_broadcast_event(
msg_broadcast_id: str, request: MessageBroadcastRequest,
Expand Down
43 changes: 34 additions & 9 deletions kairon/events/definitions/message_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ def validate(self):
len(list(MessageBroadcastProcessor.list_settings(self.bot, timestamp__gt=date_today))):
raise AppException("Notification scheduling limit reached!")

def validate_retry_broadcast(self, event_id: Text):
bot_settings = MongoProcessor.get_bot_settings(self.bot, self.user)
config = MessageBroadcastProcessor.get_settings(event_id, self.bot, is_resend=True)

if bot_settings['retry_broadcasting_limit'] <= config["retry_count"]:
raise AppException("Retry Broadcasting limit reached!")

def execute(self, event_id: Text, **kwargs):
"""
Execute the event.
Expand All @@ -50,18 +57,24 @@ def execute(self, event_id: Text, **kwargs):
reference_id = None
status = EVENT_STATUS.FAIL.value
exception = None
is_resend = kwargs.get('is_resend', False)
try:
config, reference_id = self.__retrieve_config(event_id)
broadcast = MessageBroadcastFactory.get_instance(config["connector_type"]).from_config(config, event_id, reference_id)
recipients = broadcast.get_recipients()
broadcast.send(recipients)
config, reference_id = self.__retrieve_config(event_id, is_resend)
broadcast = MessageBroadcastFactory.get_instance(config["connector_type"]).from_config(config, event_id,
reference_id)
if is_resend:
config = broadcast.resend_broadcast()
else:
recipients = broadcast.get_recipients()
broadcast.send(recipients)
status = EVENT_STATUS.COMPLETED.value
except Exception as e:
logger.exception(e)
exception = str(e)
finally:
time.sleep(5)
MessageBroadcastProcessor.insert_status_received_on_channel_webhook(reference_id, config["name"])
MessageBroadcastProcessor.insert_status_received_on_channel_webhook(reference_id, config["name"],
config["retry_count"])
MessageBroadcastProcessor.add_event_log(
self.bot, MessageBroadcastLogType.common.value, reference_id, status=status, exception=exception
)
Expand Down Expand Up @@ -99,6 +112,16 @@ def _add_schedule(self, config: Dict):
MessageBroadcastProcessor.delete_task(msg_broadcast_id, self.bot)
raise AppException(e)

def _resend_broadcast(self, msg_broadcast_id: Text):
try:
payload = {'bot': self.bot, 'user': self.user,
"event_id": msg_broadcast_id, "is_resend": True}
Utility.request_event_server(EventClass.message_broadcast, payload)
return msg_broadcast_id
except Exception as e:
logger.error(e)
raise e

def _update_schedule(self, msg_broadcast_id: Text, config: Dict):
settings_updated = False
current_settings = {}
Expand Down Expand Up @@ -130,13 +153,15 @@ def delete_schedule(self, msg_broadcast_id: Text):
logger.error(e)
raise e

def __retrieve_config(self, event_id: Text):
reference_id = ObjectId().__str__()
config = MessageBroadcastProcessor.get_settings(event_id, self.bot)
def __retrieve_config(self, event_id: Text, is_resend: bool):

reference_id = MessageBroadcastProcessor.get_reference_id_from_broadcasting_logs(event_id) \
if is_resend else ObjectId().__str__()
config = MessageBroadcastProcessor.get_settings(event_id, self.bot, is_resend=is_resend)
bot_settings = MongoProcessor.get_bot_settings(self.bot, self.user)
config["pyscript_timeout"] = bot_settings["dynamic_broadcast_execution_timeout"]
MessageBroadcastProcessor.add_event_log(
self.bot, MessageBroadcastLogType.common.value, reference_id, user=self.user, config=config,
status=EVENT_STATUS.INPROGRESS.value, event_id=event_id, is_new_log=True
status=EVENT_STATUS.INPROGRESS.value, event_id=event_id, is_new_log=True, is_resend=is_resend
)
return config, reference_id
7 changes: 6 additions & 1 deletion kairon/events/definitions/scheduled_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ def enqueue(self, event_request_type: Text, **kwargs):
request_implementation = {
EventRequestType.trigger_async.value: self._trigger_async,
EventRequestType.add_schedule.value: self._add_schedule,
EventRequestType.update_schedule.value: self._update_schedule
EventRequestType.update_schedule.value: self._update_schedule,
EventRequestType.resend_broadcast.value: self._resend_broadcast
}
if event_request_type not in request_implementation.keys():
raise AppException(f"'{event_request_type}' is not a valid event server request!")
Expand All @@ -34,6 +35,10 @@ def _add_schedule(self, config: Dict):
def _update_schedule(self, msg_broadcast_id: Text, config: Dict):
raise NotImplementedError("Provider not implemented")

@abstractmethod
def _resend_broadcast(self, msg_broadcast_id: Text):
raise NotImplementedError("Provider not implemented")

@abstractmethod
def delete_schedule(self, msg_broadcast_id: Text):
raise NotImplementedError("Provider not implemented")
62 changes: 58 additions & 4 deletions kairon/shared/channels/broadcast/whatsapp.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import datetime

import ujson as json
from typing import List, Text, Dict

Expand Down Expand Up @@ -47,7 +49,7 @@ def send(self, recipients: List, **kwargs):
else:
self.__send_using_pyscript()

def __send_using_pyscript(self):
def __send_using_pyscript(self, **kwargs):
from kairon.shared.concurrency.orchestrator import ActorOrchestrator

script = self.config['pyscript']
Expand All @@ -62,7 +64,8 @@ def send_msg(template_id: Text, recipient, language_code: Text = "en", component
MessageBroadcastProcessor.add_event_log(
self.bot, MessageBroadcastLogType.send.value, self.reference_id, api_response=response,
status=status, recipient=recipient, template_params=components, template=raw_template,
event_id=self.event_id, template_name=template_id
event_id=self.event_id, template_name=template_id, language_code=language_code, namespace=namespace,
retry_count=0
)

return response
Expand Down Expand Up @@ -107,21 +110,72 @@ def __send_using_configuration(self, recipients: List):
for recipient, t_params in zip(recipients, template_params):
recipient = str(recipient) if recipient else ""
if not Utility.check_empty_string(recipient):
response = channel_client.send_template_message(template_id, recipient, lang, t_params, namespace=namespace)
response = channel_client.send_template_message(template_id, recipient, lang, t_params,
namespace=namespace)
status = "Failed" if response.get("errors") else "Success"
if status == "Failed":
failure_cnt = failure_cnt + 1

MessageBroadcastProcessor.add_event_log(
self.bot, MessageBroadcastLogType.send.value, self.reference_id, api_response=response,
status=status, recipient=recipient, template_params=t_params, template=raw_template,
event_id=self.event_id, template_name=template_id
event_id=self.event_id, template_name=template_id, language_code=lang, namespace=namespace,
retry_count=0
)
MessageBroadcastProcessor.add_event_log(
self.bot, MessageBroadcastLogType.common.value, self.reference_id, failure_cnt=failure_cnt, total=total,
event_id=self.event_id, **evaluation_log
)

def resend_broadcast(self):
config = MessageBroadcastProcessor.get_settings(self.event_id, self.bot, is_resend=True)
retry_count = config["retry_count"]

message_broadcast_logs = MessageBroadcastProcessor.extract_message_ids_from_broadcast_logs(
self.reference_id, retry_count=retry_count
)

broadcast_logs = [log for log in message_broadcast_logs.values() if log["errors"]]
codes_to_exclude = Utility.environment["channels"]["360dialog"]["error_codes"]
required_logs = [log for log in broadcast_logs if log["errors"][0]["code"] not in codes_to_exclude]
channel_client = self.__get_client()
retry_count += 1
failure_cnt = 0
total = len(required_logs)
skipped_count = len(broadcast_logs) - total

for log in required_logs:
template_id = log["template_name"]
namespace = log["namespace"]
language_code = log["language_code"]
components = log["template_params"]
recipient = log["recipient"]
template = log["template"]
response = channel_client.send_template_message(template_id, recipient, language_code, components,
namespace)
status = "Failed" if response.get("error") else "Success"
if status == "Failed":
failure_cnt = failure_cnt + 1

MessageBroadcastProcessor.add_event_log(
self.bot, MessageBroadcastLogType.resend.value, self.reference_id, api_response=response,
status=status, recipient=recipient, template_params=components, template=template,
event_id=self.event_id, template_name=template_id, language_code=language_code, namespace=namespace,
retry_count=retry_count,
)
kwargs = {
f"resend_count_{retry_count}": total,
f"skipped_count_{retry_count}": skipped_count,
f"retry_{retry_count}_timestamp": datetime.utcnow()
}
MessageBroadcastProcessor.add_event_log(
self.bot, MessageBroadcastLogType.common.value, self.reference_id, **kwargs
)
config = MessageBroadcastProcessor.update_retry_count(self.event_id, self.bot, self.user,
retry_count=retry_count)

return config

def __get_client(self):
try:
bot_settings = MongoProcessor.get_bot_settings(self.bot, self.user)
Expand Down
2 changes: 2 additions & 0 deletions kairon/shared/chat/broadcast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
class MessageBroadcastLogType(str, Enum):
common = "common"
send = "send"
resend = "resend"
self = "self"
script_variables = "script_variables"


class MessageBroadcastType(str, Enum):
static = "static"
dynamic = "dynamic"

1 change: 1 addition & 0 deletions kairon/shared/chat/broadcast/data_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class MessageBroadcastSettings(Auditlog):
recipients_config = EmbeddedDocumentField(RecipientsConfiguration)
template_config = ListField(EmbeddedDocumentField(TemplateConfiguration))
pyscript = StringField()
retry_count = IntField(default=0)
bot = StringField(required=True)
user = StringField(required=True)
status = BooleanField(default=True)
Expand Down
Loading

0 comments on commit 2a1b4dd

Please sign in to comment.