diff --git a/backoffice/backoffice/workflows/api/serializers.py b/backoffice/backoffice/workflows/api/serializers.py index b774391a..402736d6 100644 --- a/backoffice/backoffice/workflows/api/serializers.py +++ b/backoffice/backoffice/workflows/api/serializers.py @@ -1,7 +1,8 @@ from django_elasticsearch_dsl_drf.serializers import DocumentSerializer +from drf_spectacular.utils import OpenApiExample, extend_schema_serializer from rest_framework import serializers -from backoffice.workflows.constants import ResolutionDags +from backoffice.workflows.constants import ResolutionDags, WorkflowType from backoffice.workflows.documents import WorkflowDocument from backoffice.workflows.models import Workflow, WorkflowTicket @@ -24,6 +25,53 @@ class Meta: fields = "__all__" +@extend_schema_serializer( + exclude_fields=[ + "_created_at", + "_updated_at", + ], # Exclude internal fields from schema + examples=[ + OpenApiExample( + "Author Workflow Serializer", + summary="Author Workflow Serializer no data", + description="Author Workflow Serializer", + value={ + "workflow_type": "AUTHOR_CREATE", + "status": "running", + "core": False, + "is_update": False, + "data": {}, + }, + ), + ], +) +class WorkflowAuthorSerializer(WorkflowSerializer): + def validate_workflow_type(self, value): + allowed_workflow_types = [ + WorkflowType.AUTHOR_CREATE, + WorkflowType.AUTHOR_UPDATE, + ] + if value not in allowed_workflow_types: + raise serializers.ValidationError( + f"The field `workflow_type` should be on of {allowed_workflow_types}" + ) + return value + + +@extend_schema_serializer( + examples=[ + OpenApiExample( + "Accept", + description="Author Workflow Serializer", + value={"value": "accept", "create_ticket": False}, + ), + OpenApiExample( + "Reject", + description="Author Workflow Serializer", + value={"value": "reject", "create_ticket": False}, + ), + ], +) class AuthorResolutionSerializer(serializers.Serializer): value = serializers.ChoiceField(choices=ResolutionDags) create_ticket = serializers.BooleanField(default=False) diff --git a/backoffice/backoffice/workflows/api/views.py b/backoffice/backoffice/workflows/api/views.py index 6e4a1b77..05822cf1 100644 --- a/backoffice/backoffice/workflows/api/views.py +++ b/backoffice/backoffice/workflows/api/views.py @@ -9,6 +9,13 @@ OrderingFilterBackend, ) from django_elasticsearch_dsl_drf.viewsets import BaseDocumentViewSet +from drf_spectacular.utils import ( + OpenApiExample, + OpenApiParameter, + OpenApiTypes, + extend_schema, + extend_schema_view, +) from opensearch_dsl import TermsFacet from rest_framework import status, viewsets from rest_framework.decorators import action @@ -18,6 +25,7 @@ from backoffice.workflows import airflow_utils from backoffice.workflows.api.serializers import ( AuthorResolutionSerializer, + WorkflowAuthorSerializer, WorkflowDocumentSerializer, WorkflowSerializer, WorkflowTicketSerializer, @@ -40,19 +48,6 @@ def get_queryset(self): return self.queryset -class WorkflowPartialUpdateViewSet(viewsets.ViewSet): - def partial_update(self, request, pk=None): - workflow_instance = get_object_or_404(Workflow, pk=pk) - serializer = WorkflowSerializer( - workflow_instance, data=request.data, partial=True - ) - - if serializer.is_valid(): - serializer.save() - return Response(serializer.data) - return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) - - class WorkflowTicketViewSet(viewsets.ViewSet): def retrieve(self, request, *args, **kwargs): workflow_id = kwargs.get("pk") @@ -101,8 +96,13 @@ def create(self, request, *args, **kwargs): class AuthorWorkflowViewSet(viewsets.ViewSet): - serializer_class = WorkflowSerializer + serializer_class = WorkflowAuthorSerializer + @extend_schema( + summary="Create a new Author", + description="Creates a new author, launches the required airflow dags.", + request=serializer_class, + ) def create(self, request): logger.info("Creating workflow with data: %s", request.data) serializer = self.serializer_class(data=request.data) @@ -122,6 +122,35 @@ def create(self, request): workflow.data, ) + @extend_schema( + summary="Partially Updates author", + description="Updates specific fields of the author.", + examples=[ + OpenApiExample( + "Status Update", + value={ + "status": "completed", + }, + ), + ], + ) + def partial_update(self, request, pk=None): + logger.info("Updating workflow with data: %s", request.data) + workflow_instance = get_object_or_404(Workflow, pk=pk) + serializer = self.serializer_class( + workflow_instance, data=request.data, partial=True + ) + + if serializer.is_valid(): + serializer.save() + return Response(serializer.data) + return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + + @extend_schema( + summary="Accept or Reject Author", + description="Acceps or rejects an author, run associated dags.", + request=AuthorResolutionSerializer, + ) @action(detail=True, methods=["post"]) def resolve(self, request, pk=None): logger.info("Resolving data: %s", request.data) @@ -138,6 +167,24 @@ def resolve(self, request, pk=None): ResolutionDags[serializer.validated_data["value"]].label, pk, extra_data ) + @extend_schema( + summary="Restart an Author Workflow", + description="Restart an Author Workflow.", + examples=[ + OpenApiExample( + "Restart Whole Workflow", + value={}, + ), + OpenApiExample( + "Restart Failing Task", + value={"restart_current_task": True}, + ), + OpenApiExample( + "Restart Workflow with Custom Parameters", + value={"params": {}}, + ), + ], + ) @action(detail=True, methods=["post"]) def restart(self, request, pk=None): workflow = Workflow.objects.get(id=pk) @@ -150,6 +197,42 @@ def restart(self, request, pk=None): ) +@extend_schema_view( + list=extend_schema( + summary="Search with opensearch", + description="text", + parameters=[ + OpenApiParameter( + name="search", + description="Filter books by title", + required=False, + type=OpenApiTypes.STR, + location=OpenApiParameter.QUERY, + ), + OpenApiParameter( + name="ordering", + description="Filter books by author", + required=False, + type=OpenApiTypes.STR, + location=OpenApiParameter.QUERY, + ), + OpenApiParameter( + name="status", + description="status", + required=False, + type=OpenApiTypes.STR, + location=OpenApiParameter.QUERY, + ), + OpenApiParameter( + name="workflow_type", + description="workflow_type", + required=False, + type=OpenApiTypes.STR, + location=OpenApiParameter.QUERY, + ), + ], + ), +) class WorkflowDocumentView(BaseDocumentViewSet): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -171,7 +254,10 @@ def __init__(self, *args, **kwargs): "is_update", } - filter_fields = {"status": "status", "workflow_type": "workflow_type"} + filter_fields = { + "status": "status.keyword", + "workflow_type": "workflow_type.keyword", + } ordering_fields = {"_updated_at": "_updated_at"} diff --git a/backoffice/backoffice/workflows/tests/test_views.py b/backoffice/backoffice/workflows/tests/test_views.py index d3128c3a..dd1b939d 100644 --- a/backoffice/backoffice/workflows/tests/test_views.py +++ b/backoffice/backoffice/workflows/tests/test_views.py @@ -427,7 +427,7 @@ def test_search_workflow_type(self): def test_filter_status(self): self.api_client.force_authenticate(user=self.admin) - url = reverse("search:workflow-list") + f'?status="={StatusChoices.RUNNING}' + url = reverse("search:workflow-list") + f"?status={StatusChoices.RUNNING}" response = self.api_client.get(url) diff --git a/backoffice/config/api_router.py b/backoffice/config/api_router.py index 3969a7c2..179a686b 100644 --- a/backoffice/config/api_router.py +++ b/backoffice/config/api_router.py @@ -4,7 +4,6 @@ from backoffice.users.api.views import UserViewSet from backoffice.workflows.api.views import ( AuthorWorkflowViewSet, - WorkflowPartialUpdateViewSet, WorkflowTicketViewSet, WorkflowViewSet, ) @@ -20,9 +19,6 @@ ), ) router.register("workflows", WorkflowViewSet, basename="workflows") -router.register( - "workflow-update", WorkflowPartialUpdateViewSet, basename="workflow-update" -) (router.register("workflow-ticket", WorkflowTicketViewSet, basename="workflow-ticket"),) app_name = "api" urlpatterns = router.urls diff --git a/docker-compose.yaml b/docker-compose.yaml index 1cfca714..6e7bee6a 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -61,6 +61,7 @@ x-airflow-common: &airflow-common AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true" AIRFLOW__CORE__LOAD_EXAMPLES: "false" AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session" + AIRFLOW__WEBSERVER__RELOAD_ON_PLUGIN_CHANGE: "true" # yamllint disable rule:line-length # Use simple http server on scheduler for health checks # See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server diff --git a/workflows/dags/author/author_create/author_create_approved.py b/workflows/dags/author/author_create/author_create_approved.py index 75766a0f..8a322efe 100644 --- a/workflows/dags/author/author_create/author_create_approved.py +++ b/workflows/dags/author/author_create/author_create_approved.py @@ -4,7 +4,7 @@ from airflow.decorators import dag, task from airflow.models.param import Param from airflow.utils.trigger_rule import TriggerRule -from hooks.backoffice.workflow_management_hook import WorkflowManagementHook +from hooks.backoffice.workflow_management_hook import AUTHOR, WorkflowManagementHook from hooks.backoffice.workflow_ticket_management_hook import ( WorkflowTicketManagementHook, ) @@ -55,7 +55,9 @@ def author_create_approved_dag(): def set_workflow_status_to_running(**context): status_name = "running" workflow_management_hook.set_workflow_status( - status_name=status_name, workflow_id=context["params"]["workflow_id"] + status_name=status_name, + workflow_id=context["params"]["workflow_id"], + typ=AUTHOR, ) @task.branch() @@ -130,7 +132,9 @@ def close_author_create_user_ticket(**context: dict) -> None: def set_author_create_workflow_status_to_completed(**context: dict) -> None: status_name = "completed" workflow_management_hook.set_workflow_status( - status_name=status_name, workflow_id=context["params"]["workflow_id"] + status_name=status_name, + workflow_id=context["params"]["workflow_id"], + typ=AUTHOR, ) @task @@ -144,7 +148,9 @@ def set_author_create_workflow_status_to_error(**context: dict) -> None: status_name = ti.xcom_pull(task_ids="create_author_on_inspire") logger.info(f"Workflow status: {status_name}") workflow_management_hook.set_workflow_status( - status_name=status_name, workflow_id=context["params"]["workflow_id"] + status_name=status_name, + workflow_id=context["params"]["workflow_id"], + typ=AUTHOR, ) # task definitions diff --git a/workflows/dags/author/author_create/author_create_init.py b/workflows/dags/author/author_create/author_create_init.py index fdd40b7c..6eb3dfe1 100644 --- a/workflows/dags/author/author_create/author_create_init.py +++ b/workflows/dags/author/author_create/author_create_init.py @@ -3,7 +3,7 @@ from airflow.decorators import dag, task from airflow.models.param import Param -from hooks.backoffice.workflow_management_hook import WorkflowManagementHook +from hooks.backoffice.workflow_management_hook import AUTHOR, WorkflowManagementHook from hooks.backoffice.workflow_ticket_management_hook import ( WorkflowTicketManagementHook, ) @@ -42,8 +42,13 @@ def author_create_initialization_dag(): @task() def set_workflow_status_to_running(**context): status_name = "running" + print("amazing") + print(context["params"]) + print(context) workflow_management_hook.set_workflow_status( - status_name=status_name, workflow_id=context["params"]["workflow_id"] + status_name=status_name, + workflow_id=context["params"]["workflow_id"], + typ=AUTHOR, ) @task() @@ -52,6 +57,7 @@ def set_schema(**context): workflow_management_hook.partial_update_workflow( workflow_id=context["params"]["workflow_id"], workflow_partial_update_data={"data": {"$schema": schema}}, + typ=AUTHOR, ) @task() @@ -80,7 +86,9 @@ def create_author_create_user_ticket(**context: dict) -> None: def set_author_create_workflow_status_to_approval(**context: dict) -> None: status_name = "approval" workflow_management_hook.set_workflow_status( - status_name=status_name, workflow_id=context["params"]["workflow_id"] + status_name=status_name, + workflow_id=context["params"]["workflow_id"], + typ=AUTHOR, ) # task dependencies diff --git a/workflows/dags/author/author_create/author_create_rejected.py b/workflows/dags/author/author_create/author_create_rejected.py index 7e012e7b..507ee533 100644 --- a/workflows/dags/author/author_create/author_create_rejected.py +++ b/workflows/dags/author/author_create/author_create_rejected.py @@ -2,7 +2,7 @@ from airflow.decorators import dag, task from airflow.models.param import Param -from hooks.backoffice.workflow_management_hook import WorkflowManagementHook +from hooks.backoffice.workflow_management_hook import AUTHOR, WorkflowManagementHook from hooks.backoffice.workflow_ticket_management_hook import ( WorkflowTicketManagementHook, ) @@ -49,14 +49,18 @@ def close_author_create_user_ticket(**context: dict) -> None: def set_author_create_workflow_status_to_completed(**context: dict) -> None: status_name = "completed" workflow_management_hook.set_workflow_status( - status_name=status_name, workflow_id=context["params"]["workflow_id"] + status_name=status_name, + workflow_id=context["params"]["workflow_id"], + typ=AUTHOR, ) @task() def set_workflow_status_to_running(**context): status_name = "running" workflow_management_hook.set_workflow_status( - status_name=status_name, workflow_id=context["params"]["workflow_id"] + status_name=status_name, + workflow_id=context["params"]["workflow_id"], + typ=AUTHOR, ) # task definitions diff --git a/workflows/dags/author/author_update/author_update.py b/workflows/dags/author/author_update/author_update.py index a4f0a2e7..db521125 100644 --- a/workflows/dags/author/author_update/author_update.py +++ b/workflows/dags/author/author_update/author_update.py @@ -2,7 +2,7 @@ from airflow.decorators import dag, task from airflow.models.param import Param -from hooks.backoffice.workflow_management_hook import WorkflowManagementHook +from hooks.backoffice.workflow_management_hook import AUTHOR, WorkflowManagementHook from hooks.backoffice.workflow_ticket_management_hook import ( WorkflowTicketManagementHook, ) @@ -46,7 +46,9 @@ def author_update_dag(): def set_author_update_workflow_status_to_running(**context): status_name = "running" workflow_management_hook.set_workflow_status( - status_name=status_name, workflow_id=context["params"]["workflow_id"] + status_name=status_name, + workflow_id=context["params"]["workflow_id"], + typ=AUTHOR, ) @task() @@ -92,7 +94,9 @@ def update_author_on_inspire(**context): def set_author_update_workflow_status_to_completed(**context): status_name = "completed" workflow_management_hook.set_workflow_status( - status_name=status_name, workflow_id=context["params"]["workflow_id"] + status_name=status_name, + workflow_id=context["params"]["workflow_id"], + typ=AUTHOR, ) @task.branch(provide_context=True) @@ -110,7 +114,9 @@ def set_author_update_workflow_status_to_error(**context): ti = context["ti"] status_name = ti.xcom_pull(task_ids="update_author_on_inspire") workflow_management_hook.set_workflow_status( - status_name=status_name, workflow_id=context["params"]["workflow_id"] + status_name=status_name, + workflow_id=context["params"]["workflow_id"], + typ=AUTHOR, ) # task definitions diff --git a/workflows/plugins/hooks/backoffice/workflow_management_hook.py b/workflows/plugins/hooks/backoffice/workflow_management_hook.py index 72519fd8..50774d35 100644 --- a/workflows/plugins/hooks/backoffice/workflow_management_hook.py +++ b/workflows/plugins/hooks/backoffice/workflow_management_hook.py @@ -1,6 +1,10 @@ from hooks.backoffice.base import BackofficeHook from requests import Response +AUTHOR = "aut" +HEP = "hep" +WORKFLOW_COLLECTIONS = {AUTHOR: "authors", HEP: "hep"} + class WorkflowManagementHook(BackofficeHook): """ @@ -13,7 +17,9 @@ class WorkflowManagementHook(BackofficeHook): :type http_conn_id: str """ - def set_workflow_status(self, status_name: str, workflow_id: str) -> Response: + def set_workflow_status( + self, status_name: str, workflow_id: str, typ: str + ) -> Response: """ Updates the status of a workflow in the backoffice system. @@ -21,12 +27,13 @@ def set_workflow_status(self, status_name: str, workflow_id: str) -> Response: :type status: str :param workflow_id: The ID of the workflow to update. :type workflow_id: str + :type typ: str - either authors or hep """ request_data = { "status": status_name, } return self.partial_update_workflow( - workflow_partial_update_data=request_data, workflow_id=workflow_id + workflow_partial_update_data=request_data, workflow_id=workflow_id, typ=typ ) def get_workflow(self, workflow_id: str) -> dict: @@ -47,9 +54,9 @@ def update_workflow(self, workflow_id: str, workflow_data: dict) -> Response: ) def partial_update_workflow( - self, workflow_id: str, workflow_partial_update_data: dict + self, workflow_id: str, workflow_partial_update_data: dict, typ: str ) -> Response: - endpoint = f"api/workflow-update/{workflow_id}/" + endpoint = f"api/workflows/{WORKFLOW_COLLECTIONS[typ]}/{workflow_id}/" return self.run_with_advanced_retry( _retry_args=self.tenacity_retry_kwargs, method="PATCH", diff --git a/workflows/plugins/include/utils/set_workflow_status.py b/workflows/plugins/include/utils/set_workflow_status.py index db6326f3..7a6ab900 100644 --- a/workflows/plugins/include/utils/set_workflow_status.py +++ b/workflows/plugins/include/utils/set_workflow_status.py @@ -37,7 +37,7 @@ def set_workflow_status_to_error(context: dict) -> None: """ logger.info("Setting workflow status to error") response = WorkflowManagementHook().set_workflow_status( - status_name="error", workflow_id=context["params"]["workflow_id"] + status_name="error", workflow_id=context["params"]["workflow_id"], typ="authors" ) try: response.raise_for_status()