diff --git a/.github/workflows/docker-auto-test.yaml b/.github/workflows/docker-auto-test.yaml index a60142e70..1c7976d15 100644 --- a/.github/workflows/docker-auto-test.yaml +++ b/.github/workflows/docker-auto-test.yaml @@ -80,6 +80,12 @@ jobs: LCFS_REDIS_PORT: 6379 LCFS_REDIS_PASSWORD: development_only APP_ENVIRONMENT: dev + LCFS_CHES_CLIENT_ID: mock_client_id + LCFS_CHES_CLIENT_SECRET: mock_client_secret + LCFS_CHES_AUTH_URL: http://mock_auth_url + LCFS_CHES_SENDER_EMAIL: noreply@gov.bc.ca + LCFS_CHES_SENDER_NAME: Mock Notification System + LCFS_CHES_EMAIL_URL: http://mock_email_url - name: Upload pytest results if: always() diff --git a/backend/lcfs/tests/compliance_report/test_compliance_report_views.py b/backend/lcfs/tests/compliance_report/test_compliance_report_views.py index aa5ca7675..8d7f1058b 100644 --- a/backend/lcfs/tests/compliance_report/test_compliance_report_views.py +++ b/backend/lcfs/tests/compliance_report/test_compliance_report_views.py @@ -1,7 +1,8 @@ import json +from lcfs.web.api.email.repo import CHESEmailRepository import pytest -from unittest.mock import patch +from unittest.mock import patch, AsyncMock from httpx import AsyncClient from fastapi import FastAPI @@ -16,6 +17,20 @@ ) from lcfs.services.s3.client import DocumentService +@pytest.fixture +def mock_email_repo(): + return AsyncMock(spec=CHESEmailRepository) + +@pytest.fixture +def mock_environment_vars(): + with patch("lcfs.web.api.email.services.settings") as mock_settings: + mock_settings.ches_auth_url = "http://mock_auth_url" + mock_settings.ches_email_url = "http://mock_email_url" + mock_settings.ches_client_id = "mock_client_id" + mock_settings.ches_client_secret = "mock_client_secret" + mock_settings.ches_sender_email = "noreply@gov.bc.ca" + mock_settings.ches_sender_name = "Mock Notification System" + yield mock_settings # get_compliance_periods @pytest.mark.anyio diff --git a/backend/lcfs/tests/compliance_report/test_update_service.py b/backend/lcfs/tests/compliance_report/test_update_service.py index 07285f216..ec4b7e130 100644 --- a/backend/lcfs/tests/compliance_report/test_update_service.py +++ b/backend/lcfs/tests/compliance_report/test_update_service.py @@ -1,5 +1,7 @@ from fastapi import HTTPException from lcfs.db.models.user.Role import RoleEnum +from lcfs.web.api.compliance_report.update_service import ComplianceReportUpdateService +from lcfs.web.api.notification.services import NotificationService import pytest from unittest.mock import AsyncMock, MagicMock, patch from lcfs.db.models.compliance.ComplianceReport import ComplianceReport @@ -24,6 +26,27 @@ def mock_user_has_roles(): yield mock +@pytest.fixture +def mock_notification_service(): + mock_service = AsyncMock(spec=NotificationService) + with patch( + "lcfs.web.api.compliance_report.update_service.Depends", + return_value=mock_service + ): + yield mock_service + + +@pytest.fixture +def mock_environment_vars(): + with patch("lcfs.web.api.email.services.settings") as mock_settings: + mock_settings.ches_auth_url = "http://mock_auth_url" + mock_settings.ches_email_url = "http://mock_email_url" + mock_settings.ches_client_id = "mock_client_id" + mock_settings.ches_client_secret = "mock_client_secret" + mock_settings.ches_sender_email = "noreply@gov.bc.ca" + mock_settings.ches_sender_name = "Mock Notification System" + yield mock_settings + # Mock for adjust_balance method within the OrganizationsService @pytest.fixture def mock_org_service(): @@ -35,7 +58,7 @@ def mock_org_service(): # update_compliance_report @pytest.mark.anyio async def test_update_compliance_report_status_change( - compliance_report_update_service, mock_repo + compliance_report_update_service, mock_repo, mock_notification_service ): # Mock data report_id = 1 @@ -55,6 +78,7 @@ async def test_update_compliance_report_status_change( mock_repo.get_compliance_report_by_id.return_value = mock_report mock_repo.get_compliance_report_status_by_desc.return_value = new_status compliance_report_update_service.handle_status_change = AsyncMock() + compliance_report_update_service.notfn_service = mock_notification_service mock_repo.update_compliance_report.return_value = mock_report # Call the method @@ -80,6 +104,7 @@ async def test_update_compliance_report_status_change( assert mock_report.current_status == new_status assert mock_report.supplemental_note == report_data.supplemental_note + mock_notification_service.send_notification.assert_called_once() @pytest.mark.anyio diff --git a/backend/lcfs/tests/email/test_email_service.py b/backend/lcfs/tests/email/test_email_service.py index 1fbbf06eb..7b8196fc3 100644 --- a/backend/lcfs/tests/email/test_email_service.py +++ b/backend/lcfs/tests/email/test_email_service.py @@ -1,13 +1,16 @@ +from lcfs.web.api.base import NotificationTypeEnum import pytest from unittest.mock import AsyncMock, MagicMock, patch from lcfs.web.api.email.repo import CHESEmailRepository from lcfs.web.api.email.services import CHESEmailService import os + @pytest.fixture def mock_email_repo(): return AsyncMock(spec=CHESEmailRepository) + @pytest.fixture def mock_environment_vars(): with patch("lcfs.web.api.email.services.settings") as mock_settings: @@ -19,14 +22,15 @@ def mock_environment_vars(): mock_settings.ches_sender_name = "Mock Notification System" yield mock_settings + @pytest.mark.anyio async def test_send_notification_email_success(mock_email_repo, mock_environment_vars): # Arrange - notification_type = "INITIATIVE_APPROVED" + notification_type = NotificationTypeEnum.BCEID__COMPLIANCE_REPORT__DIRECTOR_ASSESSMENT notification_context = { "subject": "Test Notification", "user_name": "John Doe", - "message_body": "Test message content" + "message_body": "Test message content", } organization_id = 1 @@ -46,21 +50,24 @@ async def test_send_notification_email_success(mock_email_repo, mock_environment # Assert assert result is True mock_email_repo.get_subscribed_user_emails.assert_called_once_with( - notification_type, organization_id + notification_type.value, organization_id # Ensure value is passed ) service._render_email_template.assert_called_once_with( - notification_type, notification_context + notification_type.value, notification_context ) service.send_email.assert_called_once() + @pytest.mark.anyio -async def test_send_notification_email_no_recipients(mock_email_repo, mock_environment_vars): +async def test_send_notification_email_no_recipients( + mock_email_repo, mock_environment_vars +): # Arrange - notification_type = "INITIATIVE_APPROVED" + notification_type = NotificationTypeEnum.BCEID__TRANSFER__PARTNER_ACTIONS notification_context = { "subject": "Test Notification", "user_name": "John Doe", - "message_body": "Test message content" + "message_body": "Test message content", } organization_id = 1 @@ -76,18 +83,19 @@ async def test_send_notification_email_no_recipients(mock_email_repo, mock_envir # Assert assert result is False mock_email_repo.get_subscribed_user_emails.assert_called_once_with( - notification_type, organization_id + notification_type.value, organization_id # Ensure value is passed ) + @pytest.mark.anyio async def test_get_ches_token_success(mock_environment_vars): # Arrange mock_token = "mock_access_token" - with patch('requests.post') as mock_post: + with patch("requests.post") as mock_post: mock_response = MagicMock() mock_response.json.return_value = { - "access_token": mock_token, - "expires_in": 3600 + "access_token": mock_token, + "expires_in": 3600, } mock_post.return_value = mock_response @@ -100,14 +108,15 @@ async def test_get_ches_token_success(mock_environment_vars): assert token == mock_token mock_post.assert_called_once() + @pytest.mark.anyio async def test_get_ches_token_cached(mock_environment_vars): # Arrange - with patch('requests.post') as mock_post: + with patch("requests.post") as mock_post: mock_response = MagicMock() mock_response.json.return_value = { - "access_token": "initial_token", - "expires_in": 3600 + "access_token": "initial_token", + "expires_in": 3600, } mock_post.return_value = mock_response @@ -124,4 +133,4 @@ async def test_get_ches_token_cached(mock_environment_vars): # Assert assert first_token == second_token - mock_post.assert_not_called() \ No newline at end of file + mock_post.assert_not_called() diff --git a/backend/lcfs/tests/initiative_agreement/test_initiative_agreement_services.py b/backend/lcfs/tests/initiative_agreement/test_initiative_agreement_services.py index c7376b613..85d0299a9 100644 --- a/backend/lcfs/tests/initiative_agreement/test_initiative_agreement_services.py +++ b/backend/lcfs/tests/initiative_agreement/test_initiative_agreement_services.py @@ -126,6 +126,7 @@ async def test_update_initiative_agreement(service, mock_repo, mock_request): mock_repo.get_initiative_agreement_by_id.return_value = mock_agreement mock_repo.get_initiative_agreement_status_by_name.return_value = mock_status mock_repo.update_initiative_agreement.return_value = mock_agreement + service.notfn_service = AsyncMock() update_data = InitiativeAgreementUpdateSchema( initiative_agreement_id=1, diff --git a/backend/lcfs/tests/transfer/test_transfer_services.py b/backend/lcfs/tests/transfer/test_transfer_services.py index dda27112c..d9e30abfb 100644 --- a/backend/lcfs/tests/transfer/test_transfer_services.py +++ b/backend/lcfs/tests/transfer/test_transfer_services.py @@ -1,5 +1,5 @@ import pytest -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, patch from lcfs.web.api.transfer.schema import TransferSchema from datetime import date from lcfs.db.models.transfer import Transfer @@ -63,22 +63,26 @@ async def test_get_transfer_success(transfer_service, mock_transfer_repo): @pytest.mark.anyio async def test_create_transfer_success(transfer_service, mock_transfer_repo): mock_transfer_repo.get_transfer_status_by_name.return_value = TransferStatus( - transfer_status_id=1, status="status" + transfer_status_id=1, status="Sent" ) mock_transfer_repo.add_transfer_history.return_value = True + transfer_id = 1 - transfer = TransferCreateSchema( + transfer_data = TransferCreateSchema( transfer_id=transfer_id, from_organization_id=1, to_organization_id=2, price_per_unit=5.75, ) - mock_transfer_repo.create_transfer.return_value = transfer + mock_transfer_repo.create_transfer.return_value = transfer_data - result = await transfer_service.create_transfer(transfer) + # Patch the _perform_notificaiton_call method + with patch.object(transfer_service, "_perform_notificaiton_call", AsyncMock()): + result = await transfer_service.create_transfer(transfer_data) - assert result.transfer_id == transfer_id - assert isinstance(result, TransferCreateSchema) + assert result.transfer_id == transfer_id + assert isinstance(result, TransferCreateSchema) + transfer_service._perform_notificaiton_call.assert_called_once() @pytest.mark.anyio diff --git a/backend/lcfs/web/api/base.py b/backend/lcfs/web/api/base.py index ed21cbadc..ecb1d3693 100644 --- a/backend/lcfs/web/api/base.py +++ b/backend/lcfs/web/api/base.py @@ -372,3 +372,25 @@ async def lcfs_cache_key_builder( # Return the cache key return cache_key + +class NotificationTypeEnum(Enum): + BCEID__COMPLIANCE_REPORT__DIRECTOR_ASSESSMENT = "BCEID__COMPLIANCE_REPORT__DIRECTOR_ASSESSMENT" + BCEID__INITIATIVE_AGREEMENT__DIRECTOR_APPROVAL = "BCEID__INITIATIVE_AGREEMENT__DIRECTOR_APPROVAL" + BCEID__TRANSFER__DIRECTOR_DECISION = "BCEID__TRANSFER__DIRECTOR_DECISION" + BCEID__TRANSFER__PARTNER_ACTIONS = "BCEID__TRANSFER__PARTNER_ACTIONS" + IDIR_ANALYST__COMPLIANCE_REPORT__DIRECTOR_DECISION = "IDIR_ANALYST__COMPLIANCE_REPORT__DIRECTOR_DECISION" + IDIR_ANALYST__COMPLIANCE_REPORT__MANAGER_RECOMMENDATION = "IDIR_ANALYST__COMPLIANCE_REPORT__MANAGER_RECOMMENDATION" + IDIR_ANALYST__COMPLIANCE_REPORT__SUBMITTED_FOR_REVIEW = "IDIR_ANALYST__COMPLIANCE_REPORT__SUBMITTED_FOR_REVIEW" + IDIR_ANALYST__INITIATIVE_AGREEMENT__RETURNED_TO_ANALYST = "IDIR_ANALYST__INITIATIVE_AGREEMENT__RETURNED_TO_ANALYST" + IDIR_ANALYST__TRANSFER__DIRECTOR_RECORDED = "IDIR_ANALYST__TRANSFER__DIRECTOR_RECORDED" + IDIR_ANALYST__TRANSFER__RESCINDED_ACTION = "IDIR_ANALYST__TRANSFER__RESCINDED_ACTION" + IDIR_ANALYST__TRANSFER__SUBMITTED_FOR_REVIEW = "IDIR_ANALYST__TRANSFER__SUBMITTED_FOR_REVIEW" + IDIR_COMPLIANCE_MANAGER__COMPLIANCE_REPORT__ANALYST_RECOMMENDATION = "IDIR_COMPLIANCE_MANAGER__COMPLIANCE_REPORT__ANALYST_RECOMMENDATION" + IDIR_COMPLIANCE_MANAGER__COMPLIANCE_REPORT__DIRECTOR_ASSESSMENT = "IDIR_COMPLIANCE_MANAGER__COMPLIANCE_REPORT__DIRECTOR_ASSESSMENT" + IDIR_COMPLIANCE_MANAGER__COMPLIANCE_REPORT__SUBMITTED_FOR_REVIEW = "IDIR_COMPLIANCE_MANAGER__COMPLIANCE_REPORT__SUBMITTED_FOR_REVIEW" + IDIR_DIRECTOR__COMPLIANCE_REPORT__MANAGER_RECOMMENDATION = "IDIR_DIRECTOR__COMPLIANCE_REPORT__MANAGER_RECOMMENDATION" + IDIR_DIRECTOR__INITIATIVE_AGREEMENT__ANALYST_RECOMMENDATION = "IDIR_DIRECTOR__INITIATIVE_AGREEMENT__ANALYST_RECOMMENDATION" + IDIR_DIRECTOR__TRANSFER__ANALYST_RECOMMENDATION = "IDIR_DIRECTOR__TRANSFER__ANALYST_RECOMMENDATION" + + def __str__(self): + return self.value \ No newline at end of file diff --git a/backend/lcfs/web/api/compliance_report/update_service.py b/backend/lcfs/web/api/compliance_report/update_service.py index dd79fc6a1..7e76ea76b 100644 --- a/backend/lcfs/web/api/compliance_report/update_service.py +++ b/backend/lcfs/web/api/compliance_report/update_service.py @@ -1,5 +1,10 @@ from fastapi import Depends, HTTPException, Request -from sqlalchemy.exc import InvalidRequestError +from lcfs.web.api.notification.schema import ( + COMPLIANCE_REPORT_STATUS_NOTIFICATION_MAPPER, + NotificationMessageSchema, + NotificationRequestSchema, +) +from lcfs.web.api.notification.services import NotificationService from lcfs.db.models.compliance.ComplianceReport import ComplianceReport from lcfs.db.models.compliance.ComplianceReportStatus import ComplianceReportStatusEnum @@ -24,12 +29,14 @@ def __init__( summary_service: ComplianceReportSummaryService = Depends(), org_service: OrganizationsService = Depends(OrganizationsService), trx_service: TransactionsService = Depends(TransactionsService), + notfn_service: NotificationService = Depends(NotificationService), ): self.repo = repo self.request = request self.summary_service = summary_service self.org_service = org_service self.trx_service = trx_service + self.notfn_service = notfn_service async def update_compliance_report( self, report_id: int, report_data: ComplianceReportUpdateSchema @@ -42,15 +49,34 @@ async def update_compliance_report( f"Compliance report with ID {report_id} not found" ) + notifications = None + notification_data: NotificationMessageSchema = NotificationMessageSchema( + message=f"Compliance report {report.compliance_report_id} has been updated", + related_organization_id=report.organization_id, + origin_user_profile_id=self.request.user.user_profile_id, + ) # if we're just returning the compliance report back to either compliance manager or analyst, # then neither history nor any updates to summary is required. if report_data.status in RETURN_STATUSES: status_has_changed = False - report_data.status = ( - "Submitted" - if report_data.status == "Return to analyst" - else ComplianceReportStatusEnum.Recommended_by_analyst.value + notifications = COMPLIANCE_REPORT_STATUS_NOTIFICATION_MAPPER.get( + report_data.status ) + if report_data.status == "Return to analyst": + report_data.status = ComplianceReportStatusEnum.Submitted.value + notification_data.message = f"Compliance report {report.compliance_report_id} has been returned to analyst" + else: + report_data.status = ( + ComplianceReportStatusEnum.Recommended_by_analyst.value + ) + + notification_data.message = f"Compliance report {report.compliance_report_id} has been returned by director" + notification_data.related_user_profile_id = [ + h.user_profile.user_profile_id + for h in report.history + if h.status.status + == ComplianceReportStatusEnum.Recommended_by_analyst + ][0] else: status_has_changed = report.current_status.status != getattr( ComplianceReportStatusEnum, report_data.status.replace(" ", "_") @@ -65,10 +91,21 @@ async def update_compliance_report( updated_report = await self.repo.update_compliance_report(report) if status_has_changed: await self.handle_status_change(report, new_status.status) - + notification_data.message = ( + f"Compliance report {report.compliance_report_id} has been updated" + ) + notifications = COMPLIANCE_REPORT_STATUS_NOTIFICATION_MAPPER.get( + new_status.status + ) # Add history record await self.repo.add_compliance_report_history(report, self.request.user) - + if notifications and isinstance(notifications, list): + await self.notfn_service.send_notification( + NotificationRequestSchema( + notification_types=notifications, + notification_data=notification_data, + ) + ) return updated_report async def handle_status_change( @@ -182,7 +219,6 @@ async def handle_submitted_status(self, report: ComplianceReport): # Update the report with the new summary report.summary = new_summary - if report.summary.line_20_surplus_deficit_units != 0: # Create a new reserved transaction for receiving organization report.transaction = await self.org_service.adjust_balance( diff --git a/backend/lcfs/web/api/email/repo.py b/backend/lcfs/web/api/email/repo.py index 7d8c6cd6e..7d5b010b3 100644 --- a/backend/lcfs/web/api/email/repo.py +++ b/backend/lcfs/web/api/email/repo.py @@ -6,7 +6,7 @@ ) from lcfs.web.core.decorators import repo_handler from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy import select, or_ +from sqlalchemy import func, select, or_ from typing import List from lcfs.db.models.user import UserProfile from lcfs.db.dependencies import get_async_db_session diff --git a/backend/lcfs/web/api/email/template_mapping.py b/backend/lcfs/web/api/email/schema.py similarity index 84% rename from backend/lcfs/web/api/email/template_mapping.py rename to backend/lcfs/web/api/email/schema.py index 4aa78e163..cd7239f39 100644 --- a/backend/lcfs/web/api/email/template_mapping.py +++ b/backend/lcfs/web/api/email/schema.py @@ -1,3 +1,11 @@ +from typing import Optional +from lcfs.web.api.base import NotificationTypeEnum +from pydantic import BaseModel, Field + +class EmailNotificationRequest(BaseModel): + notification_type: NotificationTypeEnum = Field(..., description="Type of notification") + organization_id: Optional[int] = Field(None, description="Organization ID associated with the notification") + TEMPLATE_MAPPING = { "BCEID__COMPLIANCE_REPORT__DIRECTOR_ASSESSMENT": "bceid__compliance_report__director_assessment.html", "BCEID__INITIATIVE_AGREEMENT__DIRECTOR_APPROVAL": "bceid__initiative_agreement__director_approval.html", diff --git a/backend/lcfs/web/api/email/services.py b/backend/lcfs/web/api/email/services.py index 602283e03..8c7dc4cd8 100644 --- a/backend/lcfs/web/api/email/services.py +++ b/backend/lcfs/web/api/email/services.py @@ -1,4 +1,5 @@ import os +from lcfs.web.api.base import NotificationTypeEnum import requests import structlog from fastapi import Depends @@ -9,7 +10,7 @@ from lcfs.settings import settings from lcfs.web.api.email.repo import CHESEmailRepository from lcfs.web.core.decorators import service_handler -from lcfs.web.api.email.template_mapping import TEMPLATE_MAPPING +from lcfs.web.api.email.schema import TEMPLATE_MAPPING logger = structlog.get_logger(__name__) @@ -54,7 +55,7 @@ def _validate_configuration(self): @service_handler async def send_notification_email( self, - notification_type: str, + notification_type: NotificationTypeEnum, notification_context: Dict[str, Any], organization_id: int, ) -> bool: @@ -63,16 +64,16 @@ async def send_notification_email( """ # Retrieve subscribed user emails recipient_emails = await self.repo.get_subscribed_user_emails( - notification_type, organization_id + notification_type.value, organization_id ) if not recipient_emails: logger.info(f"""No subscribers for notification type: { - notification_type}""") + notification_type.value}""") return False # Render the email content email_body = self._render_email_template( - notification_type, notification_context + notification_type.value, notification_context ) # Build email payload diff --git a/backend/lcfs/web/api/email/views.py b/backend/lcfs/web/api/email/views.py index 07d87924d..4f448e96a 100644 --- a/backend/lcfs/web/api/email/views.py +++ b/backend/lcfs/web/api/email/views.py @@ -1,5 +1,6 @@ -from fastapi import APIRouter, Depends, status, Request +from fastapi import APIRouter, Depends, HTTPException, status, Request from typing import Dict +from lcfs.web.api.email.schema import EmailNotificationRequest from lcfs.web.api.email.services import CHESEmailService from lcfs.web.core.decorators import view_handler @@ -15,6 +16,7 @@ @view_handler(["*"]) async def test_email_notification( request: Request, + payload: EmailNotificationRequest, service: CHESEmailService = Depends(), ): """ @@ -28,9 +30,9 @@ async def test_email_notification( "message_body": "This is a test notification email from LCFS Notification System.", } - # Notification type and organization for testing - notification_type = "INITIATIVE_APPROVED" - organization_id = None # Replace with a valid organization ID for testing + # Extract notification type and organization ID from the request payload + notification_type = payload.notification_type + organization_id = payload.organization_id # Trigger the email sending process success = await service.send_notification_email( @@ -42,4 +44,7 @@ async def test_email_notification( if success: return {"status": "success", "message": "Test email sent successfully."} else: - return {"status": "failure", "message": "Failed to send test email."} + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to send test email." + ) diff --git a/backend/lcfs/web/api/initiative_agreement/services.py b/backend/lcfs/web/api/initiative_agreement/services.py index 341153d7f..b7697f2a4 100644 --- a/backend/lcfs/web/api/initiative_agreement/services.py +++ b/backend/lcfs/web/api/initiative_agreement/services.py @@ -1,3 +1,9 @@ +from lcfs.web.api.notification.schema import ( + INITIATIVE_AGREEMENT_STATUS_NOTIFICATION_MAPPER, + NotificationMessageSchema, + NotificationRequestSchema, +) +from lcfs.web.api.notification.services import NotificationService import structlog from datetime import datetime from fastapi import Depends, Request, HTTPException @@ -34,12 +40,14 @@ def __init__( internal_comment_service: InternalCommentService = Depends( InternalCommentService ), + notfn_service: NotificationService = Depends(NotificationService), request: Request = None, ) -> None: self.repo = repo self.org_service = org_service self.internal_comment_service = internal_comment_service self.request = request + self.notfn_service = notfn_service @service_handler async def get_initiative_agreement( @@ -121,7 +129,7 @@ async def update_initiative_agreement( # Return the updated initiative agreement schema with the returned status flag ia_schema = InitiativeAgreementSchema.from_orm(updated_initiative_agreement) ia_schema.returned = returned - + await self._perform_notificaiton_call(ia_schema, re_recommended) return ia_schema @service_handler @@ -166,7 +174,7 @@ async def create_initiative_agreement( await self.internal_comment_service.create_internal_comment( internal_comment_data ) - + await self._perform_notificaiton_call(initiative_agreement) return initiative_agreement async def director_approve_initiative_agreement( @@ -180,7 +188,7 @@ async def director_approve_initiative_agreement( if not has_director_role: logger.error( "Non-Director tried to approve Agreement", - initiative_agreement_id=initiative_agreement.initiative_agreement_id + initiative_agreement_id=initiative_agreement.initiative_agreement_id, ) raise HTTPException(status_code=403, detail="Forbidden.") @@ -200,3 +208,23 @@ async def director_approve_initiative_agreement( initiative_agreement.transaction_effective_date = datetime.now().date() await self.repo.refresh_initiative_agreement(initiative_agreement) + await self._perform_notificaiton_call(initiative_agreement) + + async def _perform_notificaiton_call(self, ia, re_recommended=False): + """Send notifications based on the current status of the transfer.""" + notifications = INITIATIVE_AGREEMENT_STATUS_NOTIFICATION_MAPPER.get( + ia.current_status.status if not re_recommended else "Return to analyst", + None, + ) + notification_data = NotificationMessageSchema( + message=f"Initiative Agreement {ia.initiative_agreement_id} has been {ia.current_status.status}", + related_organization_id=ia.to_organization_id, + origin_user_profile_id=self.request.user.user_profile_id, + ) + if notifications and isinstance(notifications, list): + await self.notfn_service.send_notification( + NotificationRequestSchema( + notification_types=notifications, + notification_data=notification_data, + ) + ) diff --git a/backend/lcfs/web/api/notification/repo.py b/backend/lcfs/web/api/notification/repo.py index c12689ee4..ec32f9716 100644 --- a/backend/lcfs/web/api/notification/repo.py +++ b/backend/lcfs/web/api/notification/repo.py @@ -5,6 +5,8 @@ NotificationType, ChannelEnum, ) +from lcfs.db.models.user import UserProfile +from lcfs.web.api.base import NotificationTypeEnum import structlog from typing import List, Optional @@ -12,7 +14,7 @@ from lcfs.db.dependencies import get_async_db_session from lcfs.web.exception.exceptions import DataNotFoundException -from sqlalchemy import delete, select, func +from sqlalchemy import delete, or_, select, func from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload @@ -46,6 +48,16 @@ async def create_notification_message( # await self.db.refresh(notification_message) return notification_message + @repo_handler + async def create_notification_messages( + self, notification_messages: List[NotificationMessage] + ) -> None: + """ + Create bulk notification messages + """ + self.db.add_all(notification_messages) + await self.db.flush() + @repo_handler async def get_notification_messages_by_user( self, user_profile_id: int, is_read: Optional[bool] = None @@ -245,3 +257,43 @@ async def get_notification_channel_by_name( ) result = await self.db.execute(query) return result.scalars().first() + + @repo_handler + async def get_subscribed_users_by_channel( + self, + notification_type: NotificationTypeEnum, + channel: ChannelEnum, + organization_id: int = None, + ) -> List[int]: + """ + Retrieve a list of user ids subscribed to a notification type + """ + query = ( + select(NotificationChannelSubscription) + .join( + NotificationType, + NotificationType.notification_type_id + == NotificationChannelSubscription.notification_type_id, + ) + .join( + NotificationChannel, + NotificationChannel.notification_channel_id + == NotificationChannelSubscription.notification_channel_id, + ) + .join( + UserProfile, + UserProfile.user_profile_id + == NotificationChannelSubscription.user_profile_id, + ) + .filter( + NotificationType.name == notification_type.value, + NotificationChannelSubscription.is_enabled == True, + NotificationChannel.channel_name == channel.value, + or_( + UserProfile.organization_id == organization_id, + UserProfile.organization_id.is_(None), + ), + ) + ) + result = await self.db.execute(query) + return result.scalars().all() diff --git a/backend/lcfs/web/api/notification/schema.py b/backend/lcfs/web/api/notification/schema.py index 419f212c5..0176b9bdd 100644 --- a/backend/lcfs/web/api/notification/schema.py +++ b/backend/lcfs/web/api/notification/schema.py @@ -1,6 +1,11 @@ -from typing import Optional +from typing import Any, Dict, List, Optional -from lcfs.web.api.base import BaseSchema +from lcfs.db.models.compliance.ComplianceReportStatus import ComplianceReportStatusEnum +from lcfs.db.models.initiative_agreement.InitiativeAgreementStatus import ( + InitiativeAgreementStatusEnum, +) +from lcfs.db.models.transfer.TransferStatus import TransferStatusEnum +from lcfs.web.api.base import BaseSchema, NotificationTypeEnum class NotificationMessageSchema(BaseSchema): @@ -46,3 +51,75 @@ class DeleteSubscriptionSchema(BaseSchema): class DeleteNotificationChannelSubscriptionResponseSchema(BaseSchema): message: str + + +class NotificationRequestSchema(BaseSchema): + notification_types: List[NotificationTypeEnum] + notification_context: Optional[Dict[str, Any]] = {} + notification_data: Optional[NotificationMessageSchema] = None + + +COMPLIANCE_REPORT_STATUS_NOTIFICATION_MAPPER = { + ComplianceReportStatusEnum.Submitted: [ + NotificationTypeEnum.IDIR_ANALYST__COMPLIANCE_REPORT__SUBMITTED_FOR_REVIEW, + NotificationTypeEnum.IDIR_COMPLIANCE_MANAGER__COMPLIANCE_REPORT__SUBMITTED_FOR_REVIEW, + ], + ComplianceReportStatusEnum.Recommended_by_analyst: [ + NotificationTypeEnum.IDIR_COMPLIANCE_MANAGER__COMPLIANCE_REPORT__ANALYST_RECOMMENDATION + ], + ComplianceReportStatusEnum.Recommended_by_manager: [ + NotificationTypeEnum.IDIR_DIRECTOR__COMPLIANCE_REPORT__MANAGER_RECOMMENDATION + ], + ComplianceReportStatusEnum.Assessed: [ + NotificationTypeEnum.IDIR_ANALYST__COMPLIANCE_REPORT__DIRECTOR_DECISION, + NotificationTypeEnum.IDIR_COMPLIANCE_MANAGER__COMPLIANCE_REPORT__DIRECTOR_ASSESSMENT, + NotificationTypeEnum.BCEID__COMPLIANCE_REPORT__DIRECTOR_ASSESSMENT, + ], + ComplianceReportStatusEnum.ReAssessed: [ + NotificationTypeEnum.IDIR_ANALYST__COMPLIANCE_REPORT__DIRECTOR_DECISION, + NotificationTypeEnum.IDIR_COMPLIANCE_MANAGER__COMPLIANCE_REPORT__DIRECTOR_ASSESSMENT, + NotificationTypeEnum.BCEID__COMPLIANCE_REPORT__DIRECTOR_ASSESSMENT, + ], + "Return to analyst": [ + NotificationTypeEnum.IDIR_ANALYST__COMPLIANCE_REPORT__SUBMITTED_FOR_REVIEW + ], + "Return to manager": [ + NotificationTypeEnum.IDIR_COMPLIANCE_MANAGER__COMPLIANCE_REPORT__ANALYST_RECOMMENDATION + ], +} + + +TRANSFER_STATUS_NOTIFICATION_MAPPER = { + TransferStatusEnum.Sent: [ + NotificationTypeEnum.BCEID__TRANSFER__PARTNER_ACTIONS, + ], + TransferStatusEnum.Declined: [ + NotificationTypeEnum.BCEID__TRANSFER__PARTNER_ACTIONS, + ], + TransferStatusEnum.Submitted: [ + NotificationTypeEnum.IDIR_ANALYST__TRANSFER__SUBMITTED_FOR_REVIEW + ], + TransferStatusEnum.Recommended: [ + NotificationTypeEnum.IDIR_DIRECTOR__TRANSFER__ANALYST_RECOMMENDATION + ], + TransferStatusEnum.Refused: [ + NotificationTypeEnum.IDIR_ANALYST__TRANSFER__RESCINDED_ACTION, + NotificationTypeEnum.BCEID__TRANSFER__DIRECTOR_DECISION, + ], + TransferStatusEnum.Recorded: [ + NotificationTypeEnum.BCEID__TRANSFER__DIRECTOR_DECISION, + NotificationTypeEnum.IDIR_ANALYST__TRANSFER__DIRECTOR_RECORDED, + ], +} + +INITIATIVE_AGREEMENT_STATUS_NOTIFICATION_MAPPER = { + InitiativeAgreementStatusEnum.Recommended: [ + NotificationTypeEnum.IDIR_DIRECTOR__INITIATIVE_AGREEMENT__ANALYST_RECOMMENDATION + ], + InitiativeAgreementStatusEnum.Approved: [ + NotificationTypeEnum.BCEID__INITIATIVE_AGREEMENT__DIRECTOR_APPROVAL, + ], + "Return to analyst": [ + NotificationTypeEnum.IDIR_ANALYST__INITIATIVE_AGREEMENT__RETURNED_TO_ANALYST + ], +} diff --git a/backend/lcfs/web/api/notification/services.py b/backend/lcfs/web/api/notification/services.py index f9bc27951..e64848823 100644 --- a/backend/lcfs/web/api/notification/services.py +++ b/backend/lcfs/web/api/notification/services.py @@ -1,10 +1,13 @@ -from typing import Optional +from typing import List, Optional, Union from lcfs.db.models.notification import ( NotificationChannelSubscription, NotificationMessage, ChannelEnum, ) +from lcfs.web.api.base import NotificationTypeEnum +from lcfs.web.api.email.services import CHESEmailService from lcfs.web.api.notification.schema import ( + NotificationRequestSchema, SubscriptionSchema, NotificationMessageSchema, ) @@ -18,10 +21,14 @@ class NotificationService: + def __init__( - self, repo: NotificationRepository = Depends(NotificationRepository) + self, + repo: NotificationRepository = Depends(NotificationRepository), + email_service: CHESEmailService = Depends(CHESEmailService), ) -> None: self.repo = repo + self.email_service = email_service @service_handler async def get_notification_messages_by_user_id( @@ -189,3 +196,40 @@ async def delete_notification_channel_subscription( await self.repo.delete_notification_channel_subscription(subscription_id) logger.info(f"Deleted notification channel subscription {subscription_id}.") + + @service_handler + async def send_notification(self, notification: NotificationRequestSchema): + """ + Send subscribed notifications to users. + """ + # Prepare context once, outside the loop + notification.notification_context.update( + {"organization_id": notification.notification_data.related_organization_id} + ) + + for notification_type in notification.notification_types: + in_app_subscribed_users = await self.repo.get_subscribed_users_by_channel( + notification_type, + ChannelEnum.IN_APP, + notification.notification_data.related_organization_id, + ) + + # Batch create in-app notifications + in_app_notifications = [ + NotificationMessage( + **notification.notification_data.model_dump( + exclude_unset=True, exclude={"deleted"} + ), + notification_type_id=subscription.notification_type_id, + related_user_profile_id=subscription.user_profile_id, + ) + for subscription in in_app_subscribed_users + ] + if in_app_notifications: + await self.repo.create_notification_messages(in_app_notifications) + + await self.email_service.send_notification_email( + notification_type, + notification.notification_context, + notification.notification_data.related_organization_id, + ) diff --git a/backend/lcfs/web/api/transfer/services.py b/backend/lcfs/web/api/transfer/services.py index 7dcde6c63..f498d927e 100644 --- a/backend/lcfs/web/api/transfer/services.py +++ b/backend/lcfs/web/api/transfer/services.py @@ -1,3 +1,9 @@ +from lcfs.web.api.notification.schema import ( + TRANSFER_STATUS_NOTIFICATION_MAPPER, + NotificationMessageSchema, + NotificationRequestSchema, +) +from lcfs.web.api.notification.services import NotificationService import structlog from typing import List, Optional from fastapi import Depends, Request, HTTPException @@ -44,6 +50,7 @@ def __init__( org_repo: OrganizationsRepository = Depends(OrganizationsRepository), org_service: OrganizationsService = Depends(OrganizationsService), transaction_repo: TransactionRepository = Depends(TransactionRepository), + notfn_service: NotificationService = Depends(NotificationService), ) -> None: self.validate = validate self.repo = repo @@ -51,6 +58,7 @@ def __init__( self.org_repo = org_repo self.org_service = org_service self.transaction_repo = transaction_repo + self.notfn_service = notfn_service @service_handler async def get_all_transfers(self) -> List[TransferSchema]: @@ -147,6 +155,7 @@ async def create_transfer( # transfer.transfer_category_id = 1 transfer.current_status = current_status + notifications = TRANSFER_STATUS_NOTIFICATION_MAPPER.get(current_status.status) if current_status.status == TransferStatusEnum.Sent: await self.sign_and_send_from_supplier(transfer) @@ -157,6 +166,7 @@ async def create_transfer( current_status.transfer_status_id, self.request.user.user_profile_id, ) + await self._perform_notificaiton_call(notifications, transfer) return transfer @service_handler @@ -253,7 +263,44 @@ async def update_transfer(self, transfer_data: TransferCreateSchema) -> Transfer # Finally, update the transfer's status and save the changes transfer.current_status = new_status - return await self.repo.update_transfer(transfer) + transfer_result = await self.repo.update_transfer(transfer) + await self._perform_notificaiton_call(transfer_result) + return transfer_result + + async def _perform_notificaiton_call(self, transfer): + """Send notifications based on the current status of the transfer.""" + notifications = TRANSFER_STATUS_NOTIFICATION_MAPPER.get( + transfer.current_status.status + ) + notification_data = NotificationMessageSchema( + message=f"Transfer {transfer.transfer_id} has been updated", + origin_user_profile_id=self.request.user.user_profile_id, + ) + if notifications and isinstance(notifications, list): + notification_data.related_organization_id = ( + transfer.from_organization_id + if transfer.current_status.status == TransferStatusEnum.Declined + else transfer.to_organization_id + ) + await self.notfn_service.send_notification( + NotificationRequestSchema( + notification_types=notifications, + notification_data=notification_data, + ) + ) + if transfer.current_status.status in [ + TransferStatusEnum.Refused, + TransferStatusEnum.Recorded, + ]: + notification_data.related_organization_id = ( + transfer.from_organization_id + ) + await self.notfn_service.send_notification( + NotificationRequestSchema( + notification_types=notifications, + notification_data=notification_data, + ) + ) async def sign_and_send_from_supplier(self, transfer): """Create reserved transaction to reserve compliance units for sending organization."""