From cf1a56bc8ff518884f83e81072359e2e31efd176 Mon Sep 17 00:00:00 2001 From: Pradip Thapa Date: Thu, 14 Nov 2024 10:40:15 +0545 Subject: [PATCH] refactor: add task validation with pydantic, Improve DB handling & error logging (#328) * refac: refactor database connection handling and improve error logging in task processing * feat: add task retrieval endpoint with Pydantic validation * feat: Add Pydantic validation for task statistics endpoint response * refactor: vove event handling logic from route.py to logic.py --- src/backend/app/projects/image_processing.py | 38 +- src/backend/app/projects/project_logic.py | 1 - src/backend/app/tasks/task_logic.py | 348 +++++++++++++++- src/backend/app/tasks/task_routes.py | 404 +------------------ src/backend/app/tasks/task_schemas.py | 121 +++++- 5 files changed, 500 insertions(+), 412 deletions(-) diff --git a/src/backend/app/projects/image_processing.py b/src/backend/app/projects/image_processing.py index 16f55ffd..fcb6faff 100644 --- a/src/backend/app/projects/image_processing.py +++ b/src/backend/app/projects/image_processing.py @@ -102,8 +102,6 @@ def process_new_task( :return: The created task object. """ opts = self.options_list_to_dict(options) - # FIXME: take this from the function above - opts = {"dsm": True} task = self.node.create_task( images, opts, name, progress_callback, webhook=webhook ) @@ -259,26 +257,32 @@ async def download_and_upload_assets_from_odm_to_s3( f"Orthophoto for task {task_id} successfully uploaded to S3 at {s3_ortho_path}" ) - # Update background task status to COMPLETED - pool = await database.get_db_connection_pool() + # NOTE: This function uses a separate database connection pool because it is called by an internal server + # and doesn't rely on FastAPI's request context. This allows independent database access outside FastAPI's lifecycle. - async with pool.connection() as conn: - await task_logic.update_task_state( - db=conn, - project_id=dtm_project_id, - task_id=dtm_task_id, - user_id=user_id, - comment=comment, - initial_state=current_state, - final_state=State.IMAGE_PROCESSED, - updated_at=timestamp(), - ) + pool = await database.get_db_connection_pool() + async with pool as pool_instance: + async with pool_instance.connection() as conn: + await task_logic.update_task_state( + db=conn, + project_id=dtm_project_id, + task_id=dtm_task_id, + user_id=user_id, + comment=comment, + initial_state=current_state, + final_state=State.IMAGE_PROCESSED, + updated_at=timestamp(), + ) + log.info( + f"Task {dtm_task_id} state updated to IMAGE_PROCESSED in the database." + ) except Exception as e: - log.error(f"Error downloading or uploading assets for task {task_id}: {e}") + log.error( + f"An error occurred in the download, upload, or status update steps for task {task_id}. Details: {e}" + ) finally: - # Clean up the temporary directory if os.path.exists(output_file_path): try: shutil.rmtree(output_file_path) diff --git a/src/backend/app/projects/project_logic.py b/src/backend/app/projects/project_logic.py index 63513717..82e66d03 100644 --- a/src/backend/app/projects/project_logic.py +++ b/src/backend/app/projects/project_logic.py @@ -211,7 +211,6 @@ def process_drone_images( options = [ {"name": "dsm", "value": True}, {"name": "orthophoto-resolution", "value": 5}, - {"name": "cog", "value": True}, ] webhook_url = f"{settings.BACKEND_URL}/api/projects/odm/webhook/{user_id}/{project_id}/{task_id}/" diff --git a/src/backend/app/tasks/task_logic.py b/src/backend/app/tasks/task_logic.py index 398a67b2..ce15fa1b 100644 --- a/src/backend/app/tasks/task_logic.py +++ b/src/backend/app/tasks/task_logic.py @@ -1,10 +1,60 @@ import uuid import json +from app.users.user_schemas import AuthUser +from app.tasks.task_schemas import NewEvent, TaskStats +from app.users import user_schemas +from app.utils import render_email_template, send_notification_email from psycopg import Connection -from app.models.enums import HTTPStatus, State -from fastapi import HTTPException -from psycopg.rows import dict_row +from app.models.enums import EventType, HTTPStatus, State, UserRole +from fastapi import HTTPException, BackgroundTasks +from psycopg.rows import dict_row, class_row from datetime import datetime +from app.config import settings + + +async def get_task_stats(db: Connection, user_data: AuthUser): + try: + async with db.cursor(row_factory=class_row(TaskStats)) as cur: + raw_sql = """ + SELECT + COUNT(CASE WHEN te.state = 'REQUEST_FOR_MAPPING' THEN 1 END) AS request_logs, + COUNT(CASE WHEN te.state IN ('LOCKED_FOR_MAPPING', 'IMAGE_UPLOADED', 'IMAGE_PROCESSING_FAILED') THEN 1 END) AS ongoing_tasks, + COUNT(CASE WHEN te.state = 'IMAGE_PROCESSED' THEN 1 END) AS completed_tasks, + COUNT(CASE WHEN te.state = 'UNFLYABLE_TASK' THEN 1 END) AS unflyable_tasks + + FROM ( + SELECT DISTINCT ON (te.task_id) + te.task_id, + te.state, + te.created_at + FROM task_events te + WHERE + ( + %(role)s = 'DRONE_PILOT' + AND te.user_id = %(user_id)s + ) + OR + (%(role)s = 'PROJECT_CREATOR' AND te.project_id IN ( + SELECT p.id + FROM projects p + WHERE p.author_id = %(user_id)s + )) + ORDER BY te.task_id, te.created_at DESC + ) AS te; + """ + + await cur.execute( + raw_sql, {"user_id": user_data.id, "role": user_data.role} + ) + db_counts = await cur.fetchone() + + return db_counts + + except Exception as e: + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR, + detail=f"Failed to fetch task statistics. {e}", + ) async def update_take_off_point_in_db( @@ -217,3 +267,295 @@ async def get_task_state( status_code=500, detail=f"An error occurred while retrieving the task state: {str(e)}", ) + + +async def handle_event( + db: Connection, + project_id: uuid.UUID, + task_id: uuid.UUID, + user_id: str, + project: dict, + user_role: UserRole, + detail: NewEvent, + user_data: AuthUser, + background_tasks: BackgroundTasks, +): + match detail.event: + case EventType.REQUESTS: + # Determine the appropriate state and message + is_author = project["author_id"] == user_id + if user_role != UserRole.DRONE_PILOT.name and not is_author: + raise HTTPException( + status_code=403, + detail="Only the project author or drone operators can request tasks for this project.", + ) + + requires_approval = project["requires_approval_from_manager_for_locking"] + + if is_author or not requires_approval: + state_after = State.LOCKED_FOR_MAPPING + message = "Request accepted automatically" + ( + " as the author" if is_author else "" + ) + else: + state_after = State.REQUEST_FOR_MAPPING + message = "Request for mapping" + + # Perform the mapping request + data = await request_mapping( + db, + project_id, + task_id, + user_id, + message, + State.UNLOCKED_TO_MAP, + state_after, + detail.updated_at, + ) + # Send email notification if approval is required + if state_after == State.REQUEST_FOR_MAPPING: + author = await user_schemas.DbUser.get_user_by_id( + db, project["author_id"] + ) + html_content = render_email_template( + folder_name="mapping", + template_name="requests.html", + context={ + "name": author["name"], + "drone_operator_name": user_data.name, + "task_id": task_id, + "project_id": project_id, + "project_name": project["name"], + "description": project["description"], + "FRONTEND_URL": settings.FRONTEND_URL, + }, + ) + background_tasks.add_task( + send_notification_email, + author["email_address"], + "Request for mapping", + html_content, + ) + + return data + + case EventType.MAP: + if user_id != project["author_id"]: + raise HTTPException( + status_code=403, + detail="Only the project creator can approve the mapping.", + ) + + requested_user_id = await user_schemas.DbUser.get_requested_user_id( + db, project_id, task_id + ) + drone_operator = await user_schemas.DbUser.get_user_by_id( + db, requested_user_id + ) + html_content = render_email_template( + folder_name="mapping", + template_name="approved_or_rejected.html", + context={ + "email_subject": "Mapping Request Approved", + "email_body": "We are pleased to inform you that your mapping request has been approved. Your contribution is invaluable to our efforts in improving humanitarian responses worldwide.", + "task_status": "approved", + "name": user_data.name, + "drone_operator_name": drone_operator["name"], + "task_id": task_id, + "project_id": project_id, + "project_name": project["name"], + "description": project["description"], + "FRONTEND_URL": settings.FRONTEND_URL, + }, + ) + + background_tasks.add_task( + send_notification_email, + drone_operator["email_address"], + "Task is approved", + html_content, + ) + + return await update_task_state( + db, + project_id, + task_id, + requested_user_id, + "Request accepted for mapping", + State.REQUEST_FOR_MAPPING, + State.LOCKED_FOR_MAPPING, + detail.updated_at, + ) + + case EventType.REJECTED: + if user_id != project["author_id"]: + raise HTTPException( + status_code=403, + detail="Only the project creator can approve the mapping.", + ) + + requested_user_id = await user_schemas.DbUser.get_requested_user_id( + db, project_id, task_id + ) + drone_operator = await user_schemas.DbUser.get_user_by_id( + db, requested_user_id + ) + html_content = render_email_template( + folder_name="mapping", + template_name="approved_or_rejected.html", + context={ + "email_subject": "Mapping Request Rejected", + "email_body": "We are sorry to inform you that your mapping request has been rejected.", + "task_status": "rejected", + "name": user_data.name, + "drone_operator_name": drone_operator["name"], + "task_id": task_id, + "project_id": project_id, + "project_name": project["name"], + "description": project["description"], + }, + ) + + background_tasks.add_task( + send_notification_email, + drone_operator["email_address"], + "Task is Rejected", + html_content, + ) + + return await update_task_state( + db, + project_id, + task_id, + requested_user_id, + "Request for mapping rejected", + State.REQUEST_FOR_MAPPING, + State.UNLOCKED_TO_MAP, + detail.updated_at, + ) + case EventType.FINISH: + return await update_task_state( + db, + project_id, + task_id, + user_id, + "Done: unlocked to validate", + State.LOCKED_FOR_MAPPING, + State.UNLOCKED_TO_VALIDATE, + detail.updated_at, + ) + case EventType.VALIDATE: + return update_task_state( + db, + project_id, + task_id, + user_id, + "Done: locked for validation", + State.UNLOCKED_TO_VALIDATE, + State.LOCKED_FOR_VALIDATION, + detail.updated_at, + ) + case EventType.GOOD: + return await update_task_state( + db, + project_id, + task_id, + user_id, + "Done: Task is Good", + State.LOCKED_FOR_VALIDATION, + State.UNLOCKED_DONE, + detail.updated_at, + ) + + case EventType.BAD: + return await update_task_state( + db, + project_id, + task_id, + user_id, + "Done: needs to redo", + State.LOCKED_FOR_VALIDATION, + State.UNLOCKED_TO_MAP, + detail.updated_at, + ) + case EventType.COMMENT: + return await update_task_state( + db, + project_id, + task_id, + user_id, + detail.comment, + State.LOCKED_FOR_MAPPING, + State.UNFLYABLE_TASK, + detail.updated_at, + ) + + case EventType.UNLOCK: + # Fetch the task state + current_task_state = await get_task_state(db, project_id, task_id) + + state = current_task_state.get("state") + locked_user_id = current_task_state.get("user_id") + + # Determine error conditions + if state != State.LOCKED_FOR_MAPPING.name: + raise HTTPException( + status_code=400, + detail="Task state does not match expected state for unlock operation.", + ) + if user_id != locked_user_id: + raise HTTPException( + status_code=403, + detail="You cannot unlock this task as it is locked by another user.", + ) + + # Proceed with unlocking the task + return await update_task_state( + db, + project_id, + task_id, + user_id, + f"Task has been unlock by user {user_data.name}.", + State.LOCKED_FOR_MAPPING, + State.UNLOCKED_TO_MAP, + detail.updated_at, + ) + + case EventType.IMAGE_UPLOAD: + current_task_state = await get_task_state(db, project_id, task_id) + if not current_task_state: + raise HTTPException( + status_code=400, detail="Task is not ready for image upload." + ) + state = current_task_state.get("state") + locked_user_id = current_task_state.get("user_id") + + # Determine error conditions: Current State must be IMAGE_UPLOADED or IMAGE_PROCESSING_FAILED or lokec for mapping. + if state not in ( + State.IMAGE_UPLOADED.name, + State.IMAGE_PROCESSING_FAILED.name, + State.LOCKED_FOR_MAPPING.name, + ): + raise HTTPException( + status_code=400, + detail="Task state does not match expected state for image upload.", + ) + + if user_id != locked_user_id: + raise HTTPException( + status_code=403, + detail="You cannot upload an image for this task as it is locked by another user.", + ) + + return await update_task_state( + db, + project_id, + task_id, + user_id, + f"Task image uploaded by user {user_data.name}.", + State[state], + State.IMAGE_UPLOADED, + detail.updated_at, + ) + + return True diff --git a/src/backend/app/tasks/task_routes.py b/src/backend/app/tasks/task_routes.py index 4581597b..db96a2c5 100644 --- a/src/backend/app/tasks/task_routes.py +++ b/src/backend/app/tasks/task_routes.py @@ -1,17 +1,13 @@ import uuid from typing import Annotated from app.projects import project_deps, project_schemas -from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException +from fastapi import APIRouter, BackgroundTasks, Depends from app.config import settings -from app.models.enums import EventType, HTTPStatus, State, UserRole from app.tasks import task_schemas, task_logic from app.users.user_deps import login_required from app.users.user_schemas import AuthUser -from app.users import user_schemas from psycopg import Connection from app.db import database -from app.utils import send_notification_email, render_email_template -from psycopg.rows import dict_row from loguru import logger as log router = APIRouter( @@ -28,66 +24,7 @@ async def read_task( user_data: AuthUser = Depends(login_required), ): "Retrieve details of a specific task by its ID." - try: - async with db.cursor(row_factory=dict_row) as cur: - await cur.execute( - """ - SELECT - ST_Area(ST_Transform(tasks.outline, 3857)) / 1000000 AS task_area, - - -- Construct the outline as a GeoJSON Feature - jsonb_build_object( - 'type', 'Feature', - 'geometry', jsonb_build_object( - 'type', ST_GeometryType(tasks.outline)::text, -- Get the type of the geometry (e.g., Polygon, MultiPolygon) - 'coordinates', ST_AsGeoJSON(tasks.outline, 8)::jsonb->'coordinates' -- Get the geometry coordinates - ), - 'properties', jsonb_build_object( - 'id', tasks.id, - 'bbox', jsonb_build_array( -- Build the bounding box - ST_XMin(ST_Envelope(tasks.outline)), - ST_YMin(ST_Envelope(tasks.outline)), - ST_XMax(ST_Envelope(tasks.outline)), - ST_YMax(ST_Envelope(tasks.outline)) - ) - ), - 'id', tasks.id - ) AS outline, - - te.created_at, - te.updated_at, - te.state, - projects.name AS project_name, - tasks.project_task_index, - projects.front_overlap AS front_overlap, - projects.side_overlap AS side_overlap, - projects.gsd_cm_px AS gsd_cm_px, - projects.gimble_angles_degrees AS gimble_angles_degrees - - FROM ( - SELECT DISTINCT ON (te.task_id) - te.task_id, - te.created_at, - te.updated_at, - te.state - FROM task_events te - WHERE te.task_id = %(task_id)s - ORDER BY te.task_id, te.created_at DESC - ) AS te - JOIN tasks ON te.task_id = tasks.id - JOIN projects ON tasks.project_id = projects.id - WHERE te.task_id = %(task_id)s; - """, - {"task_id": task_id}, - ) - records = await cur.fetchone() - return records - - except Exception as e: - raise HTTPException( - status_code=HTTPStatus.INTERNAL_SERVER_ERROR, - detail=f"Failed to fetch task. {e}", - ) + return await task_schemas.TaskDetailsOut.get_task_details(db, task_id) @router.get("/statistics/") @@ -96,47 +33,7 @@ async def get_task_stats( user_data: AuthUser = Depends(login_required), ): "Retrieve statistics related to tasks for the authenticated user." - user_id = user_data.id - try: - async with db.cursor(row_factory=dict_row) as cur: - raw_sql = """ - SELECT - COUNT(CASE WHEN te.state = 'REQUEST_FOR_MAPPING' THEN 1 END) AS request_logs, - COUNT(CASE WHEN te.state IN ('LOCKED_FOR_MAPPING', 'IMAGE_UPLOADED', 'IMAGE_PROCESSING_FAILED') THEN 1 END) AS ongoing_tasks, - COUNT(CASE WHEN te.state = 'IMAGE_PROCESSED' THEN 1 END) AS completed_tasks, - COUNT(CASE WHEN te.state = 'UNFLYABLE_TASK' THEN 1 END) AS unflyable_tasks - - FROM ( - SELECT DISTINCT ON (te.task_id) - te.task_id, - te.state, - te.created_at - FROM task_events te - WHERE - ( - %(role)s = 'DRONE_PILOT' - AND te.user_id = %(user_id)s - ) - OR - (%(role)s = 'PROJECT_CREATOR' AND te.project_id IN ( - SELECT p.id - FROM projects p - WHERE p.author_id = %(user_id)s - )) - ORDER BY te.task_id, te.created_at DESC - ) AS te; - """ - - await cur.execute(raw_sql, {"user_id": user_id, "role": user_data.role}) - db_counts = await cur.fetchone() - - return db_counts - - except Exception as e: - raise HTTPException( - status_code=HTTPStatus.INTERNAL_SERVER_ERROR, - detail=f"Failed to fetch task statistics. {e}", - ) + return await task_logic.get_task_stats(db, user_data) @router.get("/", response_model=list[task_schemas.UserTasksStatsOut]) @@ -178,287 +75,14 @@ async def new_event( user_id = user_data.id project = project.model_dump() user_role = user_data.role - - match detail.event: - case EventType.REQUESTS: - # Determine the appropriate state and message - is_author = project["author_id"] == user_id - if user_role != UserRole.DRONE_PILOT.name and not is_author: - raise HTTPException( - status_code=403, - detail="Only the project author or drone operators can request tasks for this project.", - ) - - requires_approval = project["requires_approval_from_manager_for_locking"] - - if is_author or not requires_approval: - state_after = State.LOCKED_FOR_MAPPING - message = "Request accepted automatically" + ( - " as the author" if is_author else "" - ) - else: - state_after = State.REQUEST_FOR_MAPPING - message = "Request for mapping" - - # Perform the mapping request - data = await task_logic.request_mapping( - db, - project_id, - task_id, - user_id, - message, - State.UNLOCKED_TO_MAP, - state_after, - detail.updated_at, - ) - # Send email notification if approval is required - if state_after == State.REQUEST_FOR_MAPPING: - author = await user_schemas.DbUser.get_user_by_id( - db, project["author_id"] - ) - html_content = render_email_template( - folder_name="mapping", - template_name="requests.html", - context={ - "name": author["name"], - "drone_operator_name": user_data.name, - "task_id": task_id, - "project_id": project_id, - "project_name": project["name"], - "description": project["description"], - "FRONTEND_URL": settings.FRONTEND_URL, - }, - ) - background_tasks.add_task( - send_notification_email, - author["email_address"], - "Request for mapping", - html_content, - ) - - return data - - case EventType.MAP: - if user_id != project["author_id"]: - raise HTTPException( - status_code=403, - detail="Only the project creator can approve the mapping.", - ) - - requested_user_id = await user_schemas.DbUser.get_requested_user_id( - db, project_id, task_id - ) - drone_operator = await user_schemas.DbUser.get_user_by_id( - db, requested_user_id - ) - html_content = render_email_template( - folder_name="mapping", - template_name="approved_or_rejected.html", - context={ - "email_subject": "Mapping Request Approved", - "email_body": "We are pleased to inform you that your mapping request has been approved. Your contribution is invaluable to our efforts in improving humanitarian responses worldwide.", - "task_status": "approved", - "name": user_data.name, - "drone_operator_name": drone_operator["name"], - "task_id": task_id, - "project_id": project_id, - "project_name": project["name"], - "description": project["description"], - "FRONTEND_URL": settings.FRONTEND_URL, - }, - ) - - background_tasks.add_task( - send_notification_email, - drone_operator["email_address"], - "Task is approved", - html_content, - ) - - return await task_logic.update_task_state( - db, - project_id, - task_id, - requested_user_id, - "Request accepted for mapping", - State.REQUEST_FOR_MAPPING, - State.LOCKED_FOR_MAPPING, - detail.updated_at, - ) - - case EventType.REJECTED: - if user_id != project["author_id"]: - raise HTTPException( - status_code=403, - detail="Only the project creator can approve the mapping.", - ) - - requested_user_id = await user_schemas.DbUser.get_requested_user_id( - db, project_id, task_id - ) - drone_operator = await user_schemas.DbUser.get_user_by_id( - db, requested_user_id - ) - html_content = render_email_template( - folder_name="mapping", - template_name="approved_or_rejected.html", - context={ - "email_subject": "Mapping Request Rejected", - "email_body": "We are sorry to inform you that your mapping request has been rejected.", - "task_status": "rejected", - "name": user_data.name, - "drone_operator_name": drone_operator["name"], - "task_id": task_id, - "project_id": project_id, - "project_name": project["name"], - "description": project["description"], - }, - ) - - background_tasks.add_task( - send_notification_email, - drone_operator["email_address"], - "Task is Rejected", - html_content, - ) - - return await task_logic.update_task_state( - db, - project_id, - task_id, - requested_user_id, - "Request for mapping rejected", - State.REQUEST_FOR_MAPPING, - State.UNLOCKED_TO_MAP, - detail.updated_at, - ) - case EventType.FINISH: - return await task_logic.update_task_state( - db, - project_id, - task_id, - user_id, - "Done: unlocked to validate", - State.LOCKED_FOR_MAPPING, - State.UNLOCKED_TO_VALIDATE, - detail.updated_at, - ) - case EventType.VALIDATE: - return task_logic.update_task_state( - db, - project_id, - task_id, - user_id, - "Done: locked for validation", - State.UNLOCKED_TO_VALIDATE, - State.LOCKED_FOR_VALIDATION, - detail.updated_at, - ) - case EventType.GOOD: - return await task_logic.update_task_state( - db, - project_id, - task_id, - user_id, - "Done: Task is Good", - State.LOCKED_FOR_VALIDATION, - State.UNLOCKED_DONE, - detail.updated_at, - ) - - case EventType.BAD: - return await task_logic.update_task_state( - db, - project_id, - task_id, - user_id, - "Done: needs to redo", - State.LOCKED_FOR_VALIDATION, - State.UNLOCKED_TO_MAP, - detail.updated_at, - ) - case EventType.COMMENT: - return await task_logic.update_task_state( - db, - project_id, - task_id, - user_id, - detail.comment, - State.LOCKED_FOR_MAPPING, - State.UNFLYABLE_TASK, - detail.updated_at, - ) - - case EventType.UNLOCK: - # Fetch the task state - current_task_state = await task_logic.get_task_state( - db, project_id, task_id - ) - - state = current_task_state.get("state") - locked_user_id = current_task_state.get("user_id") - - # Determine error conditions - if state != State.LOCKED_FOR_MAPPING.name: - raise HTTPException( - status_code=400, - detail="Task state does not match expected state for unlock operation.", - ) - if user_id != locked_user_id: - raise HTTPException( - status_code=403, - detail="You cannot unlock this task as it is locked by another user.", - ) - - # Proceed with unlocking the task - return await task_logic.update_task_state( - db, - project_id, - task_id, - user_id, - f"Task has been unlock by user {user_data.name}.", - State.LOCKED_FOR_MAPPING, - State.UNLOCKED_TO_MAP, - detail.updated_at, - ) - - case EventType.IMAGE_UPLOAD: - current_task_state = await task_logic.get_task_state( - db, project_id, task_id - ) - if not current_task_state: - raise HTTPException( - status_code=400, detail="Task is not ready for image upload." - ) - state = current_task_state.get("state") - locked_user_id = current_task_state.get("user_id") - - # Determine error conditions: Current State must be IMAGE_UPLOADED or IMAGE_PROCESSING_FAILED or lokec for mapping. - if state not in ( - State.IMAGE_UPLOADED.name, - State.IMAGE_PROCESSING_FAILED.name, - State.LOCKED_FOR_MAPPING.name, - ): - raise HTTPException( - status_code=400, - detail="Task state does not match expected state for image upload.", - ) - - if user_id != locked_user_id: - raise HTTPException( - status_code=403, - detail="You cannot upload an image for this task as it is locked by another user.", - ) - - return await task_logic.update_task_state( - db, - project_id, - task_id, - user_id, - f"Task image uploaded by user {user_data.name}.", - State[state], - State.IMAGE_UPLOADED, - detail.updated_at, - ) - - return True + return await task_logic.handle_event( + db, + project_id, + task_id, + user_id, + project, + user_role, + detail, + user_data, + background_tasks, + ) diff --git a/src/backend/app/tasks/task_schemas.py b/src/backend/app/tasks/task_schemas.py index 28b2fbfd..16c43576 100644 --- a/src/backend/app/tasks/task_schemas.py +++ b/src/backend/app/tasks/task_schemas.py @@ -6,7 +6,25 @@ from loguru import logger as log from fastapi import HTTPException from psycopg.rows import class_row, dict_row -from typing import Optional +from typing import List, Literal, Optional +from pydantic.functional_validators import field_validator + + +class Geometry(BaseModel): + type: Literal["ST_Polygon"] + coordinates: List[List[List[float]]] + + +class Properties(BaseModel): + id: uuid.UUID + bbox: List[float] + + +class Outline(BaseModel): + id: uuid.UUID + type: Literal["Feature"] + geometry: Geometry + properties: Properties class NewEvent(BaseModel): @@ -204,3 +222,104 @@ async def get_tasks_by_user( status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail="Retrieval failed", ) from e + + +class TaskDetailsOut(BaseModel): + task_area: float + outline: Outline + created_at: datetime + updated_at: datetime + state: State + project_name: str + project_task_index: int + front_overlap: float + side_overlap: float + gsd_cm_px: float + gimble_angles_degrees: Optional[int] = None + + @field_validator("state", mode="after") + @classmethod + def integer_state_to_string(cls, value: State): + if isinstance(value, str): + value = value.name + + if isinstance(value, int): + value = State(value).name + return value + + @field_validator("state", mode="before") + @classmethod + def srting_state_to_integer(cls, value: State) -> str: + if isinstance(value, str): + value = State[value.strip()].value + return value + + @staticmethod + async def get_task_details(db: Connection, task_id: uuid.UUID): + try: + async with db.cursor(row_factory=class_row(TaskDetailsOut)) as cur: + await cur.execute( + """ + SELECT + ST_Area(ST_Transform(tasks.outline, 3857)) / 1000000 AS task_area, + + -- Construct the outline as a GeoJSON Feature + jsonb_build_object( + 'type', 'Feature', + 'geometry', jsonb_build_object( + 'type', ST_GeometryType(tasks.outline)::text, -- Get the type of the geometry (e.g., Polygon, MultiPolygon) + 'coordinates', ST_AsGeoJSON(tasks.outline, 8)::jsonb->'coordinates' -- Get the geometry coordinates + ), + 'properties', jsonb_build_object( + 'id', tasks.id, + 'bbox', jsonb_build_array( -- Build the bounding box + ST_XMin(ST_Envelope(tasks.outline)), + ST_YMin(ST_Envelope(tasks.outline)), + ST_XMax(ST_Envelope(tasks.outline)), + ST_YMax(ST_Envelope(tasks.outline)) + ) + ), + 'id', tasks.id + ) AS outline, + + te.created_at, + te.updated_at, + te.state, + projects.name AS project_name, + tasks.project_task_index, + projects.front_overlap AS front_overlap, + projects.side_overlap AS side_overlap, + projects.gsd_cm_px AS gsd_cm_px, + projects.gimble_angles_degrees AS gimble_angles_degrees + + FROM ( + SELECT DISTINCT ON (te.task_id) + te.task_id, + te.created_at, + te.updated_at, + te.state + FROM task_events te + WHERE te.task_id = %(task_id)s + ORDER BY te.task_id, te.created_at DESC + ) AS te + JOIN tasks ON te.task_id = tasks.id + JOIN projects ON tasks.project_id = projects.id + WHERE te.task_id = %(task_id)s; + """, + {"task_id": task_id}, + ) + records = await cur.fetchone() + return records + + except Exception as e: + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR, + detail=f"Failed to fetch task. {e}", + ) + + +class TaskStats(BaseModel): + request_logs: int + ongoing_tasks: int + completed_tasks: int + unflyable_tasks: int