Skip to content

Commit

Permalink
LCFS - Implement Email Notification Triggers in Backend for Subscribe…
Browse files Browse the repository at this point in the history
…d Users #1226
  • Loading branch information
prv-proton committed Dec 9, 2024
1 parent 8e4aa91 commit 72f6b91
Show file tree
Hide file tree
Showing 11 changed files with 352 additions and 28 deletions.
22 changes: 22 additions & 0 deletions backend/lcfs/web/api/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,3 +371,25 @@ async def lcfs_cache_key_builder(

# Return the cache key
return cache_key

class NotificationTypeEnum(Enum):
BCEID__COMPLIANCE_REPORT__DIRECTOR_ASSESSMENT = "BCEID__COMPLIANCE_REPORT__DIRECTOR_ASSESSMENT"
BCEID__INITIATIVE_AGREEMENT__DIRECTOR_APPROVAL = "BCEID__INITIATIVE_AGREEMENT__DIRECTOR_APPROVAL"
BCEID__TRANSFER__DIRECTOR_DECISION = "BCEID__TRANSFER__DIRECTOR_DECISION"
BCEID__TRANSFER__PARTNER_ACTIONS = "BCEID__TRANSFER__PARTNER_ACTIONS"
IDIR_ANALYST__COMPLIANCE_REPORT__DIRECTOR_DECISION = "IDIR_ANALYST__COMPLIANCE_REPORT__DIRECTOR_DECISION"
IDIR_ANALYST__COMPLIANCE_REPORT__MANAGER_RECOMMENDATION = "IDIR_ANALYST__COMPLIANCE_REPORT__MANAGER_RECOMMENDATION"
IDIR_ANALYST__COMPLIANCE_REPORT__SUBMITTED_FOR_REVIEW = "IDIR_ANALYST__COMPLIANCE_REPORT__SUBMITTED_FOR_REVIEW"
IDIR_ANALYST__INITIATIVE_AGREEMENT__RETURNED_TO_ANALYST = "IDIR_ANALYST__INITIATIVE_AGREEMENT__RETURNED_TO_ANALYST"
IDIR_ANALYST__TRANSFER__DIRECTOR_RECORDED = "IDIR_ANALYST__TRANSFER__DIRECTOR_RECORDED"
IDIR_ANALYST__TRANSFER__RESCINDED_ACTION = "IDIR_ANALYST__TRANSFER__RESCINDED_ACTION"
IDIR_ANALYST__TRANSFER__SUBMITTED_FOR_REVIEW = "IDIR_ANALYST__TRANSFER__SUBMITTED_FOR_REVIEW"
IDIR_COMPLIANCE_MANAGER__COMPLIANCE_REPORT__ANALYST_RECOMMENDATION = "IDIR_COMPLIANCE_MANAGER__COMPLIANCE_REPORT__ANALYST_RECOMMENDATION"
IDIR_COMPLIANCE_MANAGER__COMPLIANCE_REPORT__DIRECTOR_ASSESSMENT = "IDIR_COMPLIANCE_MANAGER__COMPLIANCE_REPORT__DIRECTOR_ASSESSMENT"
IDIR_COMPLIANCE_MANAGER__COMPLIANCE_REPORT__SUBMITTED_FOR_REVIEW = "IDIR_COMPLIANCE_MANAGER__COMPLIANCE_REPORT__SUBMITTED_FOR_REVIEW"
IDIR_DIRECTOR__COMPLIANCE_REPORT__MANAGER_RECOMMENDATION = "IDIR_DIRECTOR__COMPLIANCE_REPORT__MANAGER_RECOMMENDATION"
IDIR_DIRECTOR__INITIATIVE_AGREEMENT__ANALYST_RECOMMENDATION = "IDIR_DIRECTOR__INITIATIVE_AGREEMENT__ANALYST_RECOMMENDATION"
IDIR_DIRECTOR__TRANSFER__ANALYST_RECOMMENDATION = "IDIR_DIRECTOR__TRANSFER__ANALYST_RECOMMENDATION"

def __str__(self):
return self.value
52 changes: 45 additions & 7 deletions backend/lcfs/web/api/compliance_report/update_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
from fastapi import Depends, HTTPException, Request
from lcfs.web.api.base import NotificationTypeEnum
from lcfs.web.api.notification.schema import (
COMPLIANCE_REPORT_STATUS_NOTIFICATION_MAPPER,
NotificationMessageSchema,
NotificationRequestSchema,
)
from lcfs.web.api.notification.services import NotificationService
from sqlalchemy.exc import InvalidRequestError

from lcfs.db.models.compliance.ComplianceReport import ComplianceReport
Expand All @@ -24,12 +31,14 @@ def __init__(
summary_service: ComplianceReportSummaryService = Depends(),
org_service: OrganizationsService = Depends(OrganizationsService),
trx_service: TransactionsService = Depends(TransactionsService),
notfn_service: NotificationService = Depends(NotificationService),
):
self.repo = repo
self.request = request
self.summary_service = summary_service
self.org_service = org_service
self.trx_service = trx_service
self.notfn_service = notfn_service

async def update_compliance_report(
self, report_id: int, report_data: ComplianceReportUpdateSchema
Expand All @@ -42,15 +51,34 @@ async def update_compliance_report(
f"Compliance report with ID {report_id} not found"
)

notifications = None
notification_data: NotificationMessageSchema = NotificationMessageSchema(
message=f"Compliance report {report.compliance_report_id} has been updated",
related_organization_id=report.organization_id,
origin_user_profile_id=self.request.user.user_profile_id,
)
# if we're just returning the compliance report back to either compliance manager or analyst,
# then neither history nor any updates to summary is required.
if report_data.status in RETURN_STATUSES:
status_has_changed = False
report_data.status = (
"Submitted"
if report_data.status == "Return to analyst"
else ComplianceReportStatusEnum.Recommended_by_analyst.value
notifications = COMPLIANCE_REPORT_STATUS_NOTIFICATION_MAPPER.get(
report_data.status
)
if report_data.status == "Return to analyst":
report_data.status = ComplianceReportStatusEnum.Submitted.value
notification_data.message = f"Compliance report {report.compliance_report_id} has been returned to analyst"
else:
report_data.status = (
ComplianceReportStatusEnum.Recommended_by_analyst.value
)

notification_data.message = f"Compliance report {report.compliance_report_id} has been returned by director"
notification_data.related_user_profile_id = [
h.user_profile.user_profile_id
for h in report.history
if h.status.status
== ComplianceReportStatusEnum.Recommended_by_analyst
][0]
else:
status_has_changed = report.current_status.status != getattr(
ComplianceReportStatusEnum, report_data.status.replace(" ", "_")
Expand All @@ -65,10 +93,21 @@ async def update_compliance_report(
updated_report = await self.repo.update_compliance_report(report)
if status_has_changed:
await self.handle_status_change(report, new_status.status)

notification_data.message = (
f"Compliance report {report.compliance_report_id} has been updated"
)
notifications = COMPLIANCE_REPORT_STATUS_NOTIFICATION_MAPPER.get(
new_status.status
)
# Add history record
await self.repo.add_compliance_report_history(report, self.request.user)

if notifications and isinstance(notifications, list):
await self.notfn_service.send_notification(
NotificationRequestSchema(
notification_types=notifications,
notification_data=notification_data,
)
)
return updated_report

async def handle_status_change(
Expand Down Expand Up @@ -182,7 +221,6 @@ async def handle_submitted_status(self, report: ComplianceReport):
# Update the report with the new summary
report.summary = new_summary


if report.summary.line_20_surplus_deficit_units != 0:
# Create a new reserved transaction for receiving organization
report.transaction = await self.org_service.adjust_balance(
Expand Down
6 changes: 4 additions & 2 deletions backend/lcfs/web/api/email/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
)
from lcfs.web.core.decorators import repo_handler
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, or_
from sqlalchemy import func, select, or_
from typing import List
from lcfs.db.models.user import UserProfile
from lcfs.db.dependencies import get_async_db_session
Expand All @@ -26,7 +26,9 @@ async def get_subscribed_user_emails(
Ignores In-Application notification types.
"""
query = (
select(UserProfile.email)
select(
func.coalesce(UserProfile.notifications_email, UserProfile.email).label("email")
)
.join(NotificationChannelSubscription)
.join(NotificationChannelSubscription.notification_channel)
.filter(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
from typing import Optional
from lcfs.web.api.base import NotificationTypeEnum
from pydantic import BaseModel, Field

class EmailNotificationRequest(BaseModel):
notification_type: NotificationTypeEnum = Field(..., description="Type of notification")
organization_id: Optional[int] = Field(None, description="Organization ID associated with the notification")

TEMPLATE_MAPPING = {
"BCEID__COMPLIANCE_REPORT__DIRECTOR_ASSESSMENT": "bceid__compliance_report__director_assessment.html",
"BCEID__INITIATIVE_AGREEMENT__DIRECTOR_APPROVAL": "bceid__initiative_agreement__director_approval.html",
Expand Down
11 changes: 6 additions & 5 deletions backend/lcfs/web/api/email/services.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from lcfs.web.api.base import NotificationTypeEnum
import requests
import structlog
from fastapi import Depends
Expand All @@ -9,7 +10,7 @@
from lcfs.settings import settings
from lcfs.web.api.email.repo import CHESEmailRepository
from lcfs.web.core.decorators import service_handler
from lcfs.web.api.email.template_mapping import TEMPLATE_MAPPING
from lcfs.web.api.email.schema import TEMPLATE_MAPPING

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -54,7 +55,7 @@ def _validate_configuration(self):
@service_handler
async def send_notification_email(
self,
notification_type: str,
notification_type: NotificationTypeEnum,
notification_context: Dict[str, Any],
organization_id: int,
) -> bool:
Expand All @@ -63,16 +64,16 @@ async def send_notification_email(
"""
# Retrieve subscribed user emails
recipient_emails = await self.repo.get_subscribed_user_emails(
notification_type, organization_id
notification_type.value, organization_id
)
if not recipient_emails:
logger.info(f"""No subscribers for notification type: {
notification_type}""")
notification_type.value}""")
return False

# Render the email content
email_body = self._render_email_template(
notification_type, notification_context
notification_type.value, notification_context
)

# Build email payload
Expand Down
15 changes: 10 additions & 5 deletions backend/lcfs/web/api/email/views.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from fastapi import APIRouter, Depends, status, Request
from fastapi import APIRouter, Depends, HTTPException, status, Request
from typing import Dict
from lcfs.web.api.email.schema import EmailNotificationRequest
from lcfs.web.api.email.services import CHESEmailService
from lcfs.web.core.decorators import view_handler

Expand All @@ -15,6 +16,7 @@
@view_handler(["*"])
async def test_email_notification(
request: Request,
payload: EmailNotificationRequest,
service: CHESEmailService = Depends(),
):
"""
Expand All @@ -28,9 +30,9 @@ async def test_email_notification(
"message_body": "This is a test notification email from LCFS Notification System.",
}

# Notification type and organization for testing
notification_type = "INITIATIVE_APPROVED"
organization_id = None # Replace with a valid organization ID for testing
# Extract notification type and organization ID from the request payload
notification_type = payload.notification_type
organization_id = payload.organization_id

# Trigger the email sending process
success = await service.send_notification_email(
Expand All @@ -42,4 +44,7 @@ async def test_email_notification(
if success:
return {"status": "success", "message": "Test email sent successfully."}
else:
return {"status": "failure", "message": "Failed to send test email."}
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to send test email."
)
34 changes: 31 additions & 3 deletions backend/lcfs/web/api/initiative_agreement/services.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
from lcfs.web.api.notification.schema import (
INITIATIVE_AGREEMENT_STATUS_NOTIFICATION_MAPPER,
NotificationMessageSchema,
NotificationRequestSchema,
)
from lcfs.web.api.notification.services import NotificationService
import structlog
from datetime import datetime
from fastapi import Depends, Request, HTTPException
Expand Down Expand Up @@ -34,12 +40,14 @@ def __init__(
internal_comment_service: InternalCommentService = Depends(
InternalCommentService
),
notfn_service: NotificationService = Depends(NotificationService),
request: Request = None,
) -> None:
self.repo = repo
self.org_service = org_service
self.internal_comment_service = internal_comment_service
self.request = request
self.notfn_service = notfn_service

@service_handler
async def get_initiative_agreement(
Expand Down Expand Up @@ -121,7 +129,7 @@ async def update_initiative_agreement(
# Return the updated initiative agreement schema with the returned status flag
ia_schema = InitiativeAgreementSchema.from_orm(updated_initiative_agreement)
ia_schema.returned = returned

await self._perform_notificaiton_call(ia_schema, re_recommended)
return ia_schema

@service_handler
Expand Down Expand Up @@ -166,7 +174,7 @@ async def create_initiative_agreement(
await self.internal_comment_service.create_internal_comment(
internal_comment_data
)

await self._perform_notificaiton_call(initiative_agreement)
return initiative_agreement

async def director_approve_initiative_agreement(
Expand All @@ -180,7 +188,7 @@ async def director_approve_initiative_agreement(
if not has_director_role:
logger.error(
"Non-Director tried to approve Agreement",
initiative_agreement_id=initiative_agreement.initiative_agreement_id
initiative_agreement_id=initiative_agreement.initiative_agreement_id,
)
raise HTTPException(status_code=403, detail="Forbidden.")

Expand All @@ -200,3 +208,23 @@ async def director_approve_initiative_agreement(
initiative_agreement.transaction_effective_date = datetime.now().date()

await self.repo.refresh_initiative_agreement(initiative_agreement)
await self._perform_notificaiton_call(initiative_agreement)

async def _perform_notificaiton_call(self, ia, re_recommended=False):
"""Send notifications based on the current status of the transfer."""
notifications = INITIATIVE_AGREEMENT_STATUS_NOTIFICATION_MAPPER.get(
ia.current_status.status if not re_recommended else "Return to analyst",
None,
)
notification_data = NotificationMessageSchema(
message=f"Initiative Agreement {ia.initiative_agreement_id} has been {ia.current_status.status}",
related_organization_id=ia.to_organization_id,
origin_user_profile_id=self.request.user.user_profile_id,
)
if notifications and isinstance(notifications, list):
await self.notfn_service.send_notification(
NotificationRequestSchema(
notification_types=notifications,
notification_data=notification_data,
)
)
54 changes: 53 additions & 1 deletion backend/lcfs/web/api/notification/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
NotificationType,
ChannelEnum,
)
from lcfs.db.models.user import UserProfile
from lcfs.web.api.base import NotificationTypeEnum
import structlog

from typing import List, Optional
from fastapi import Depends
from lcfs.db.dependencies import get_async_db_session
from lcfs.web.exception.exceptions import DataNotFoundException

from sqlalchemy import delete, select, func
from sqlalchemy import delete, or_, select, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload

Expand Down Expand Up @@ -46,6 +48,16 @@ async def create_notification_message(
# await self.db.refresh(notification_message)
return notification_message

@repo_handler
async def create_notification_messages(
self, notification_messages: List[NotificationMessage]
) -> None:
"""
Create bulk notification messages
"""
self.db.add_all(notification_messages)
await self.db.flush()

@repo_handler
async def get_notification_messages_by_user(
self, user_profile_id: int, is_read: Optional[bool] = None
Expand Down Expand Up @@ -245,3 +257,43 @@ async def get_notification_channel_by_name(
)
result = await self.db.execute(query)
return result.scalars().first()

@repo_handler
async def get_subscribed_users_by_channel(
self,
notification_type: NotificationTypeEnum,
channel: ChannelEnum,
organization_id: int = None,
) -> List[int]:
"""
Retrieve a list of user ids subscribed to a notification type
"""
query = (
select(NotificationChannelSubscription)
.join(
NotificationType,
NotificationType.notification_type_id
== NotificationChannelSubscription.notification_type_id,
)
.join(
NotificationChannel,
NotificationChannel.notification_channel_id
== NotificationChannelSubscription.notification_channel_id,
)
.join(
UserProfile,
UserProfile.user_profile_id
== NotificationChannelSubscription.user_profile_id,
)
.filter(
NotificationType.name == notification_type.value,
NotificationChannelSubscription.is_enabled == True,
NotificationChannel.channel_name == channel.value,
or_(
UserProfile.organization_id == organization_id,
UserProfile.organization_id.is_(None),
),
)
)
result = await self.db.execute(query)
return result.scalars().all()
Loading

0 comments on commit 72f6b91

Please sign in to comment.