diff --git a/.envs/docker/.django b/.envs/docker/.django index cbae791d..48fbb2a6 100644 --- a/.envs/docker/.django +++ b/.envs/docker/.django @@ -17,3 +17,7 @@ CELERY_FLOWER_PASSWORD=debug # OpenSearch OPENSEARCH_HOST=opensearch:9200 OPENSEARCH_INDEX_PREFIX=backoffice-backend-local + +# Airflow +AIRFLOW_BASE_URL=http://localhost:8080 +AIRFLOW_TOKEN=CHANGE_ME diff --git a/.envs/local/.django b/.envs/local/.django index c0ba6588..6674841e 100644 --- a/.envs/local/.django +++ b/.envs/local/.django @@ -17,3 +17,7 @@ CELERY_FLOWER_PASSWORD=debug # Opensearch OPENSEARCH_HOST=opensearch:9200 OPENSEARCH_INDEX_PREFIX=backoffice-backend-local + +# Airflow +AIRFLOW_BASE_URL=http://host.docker.internal:8082 +AIRFLOW_TOKEN=CHANGE_ME diff --git a/backoffice/workflows/airflow_utils.py b/backoffice/workflows/airflow_utils.py index 95dd41a3..00952c2a 100644 --- a/backoffice/workflows/airflow_utils.py +++ b/backoffice/workflows/airflow_utils.py @@ -1,39 +1,34 @@ -import requests from os import environ + +import requests from django.http import JsonResponse -from requests.exceptions import HTTPError, RequestException +from requests.exceptions import RequestException +from rest_framework import status + +AIRFLOW_BASE_URL = environ.get("AIRFLOW_BASE_URL") + +AIRFLOW_HEADERS = {"Content-Type": "application/json", "Authorization": f"Basic {environ.get('AIRFLOW_TOKEN')}"} -AIRFLOW_BASE_URL = environ.get('AIRFLOW_BASE_URL') -AIRFLOW_HEADERS = { - "Content-Type": "application/json", - "Authorization": f"Basic {environ.get('AIRFLOW_TOKEN')}" -} +def trigger_airflow_dag(dag_id, workflow_id, extra_data=None): + """Triggers an airflow dag. -def trigger_airflow_dag(dag_id,workflow_id, extra_data = None): - """ triggers an airflow dag :param dag_id: name of the dag to run :param workflow_id: id of the workflow being triggered - :return request response""" + :returns: request response + """ - data = { - "dag_run_id": workflow_id, - "conf": - { - "workflow_id": workflow_id - } - } + data = {"dag_run_id": workflow_id, "conf": {"workflow_id": workflow_id}} if extra_data is not None: data["conf"].update(extra_data) - url = f'{AIRFLOW_BASE_URL}/api/v1/dags/{dag_id}/dagRuns' + url = f"{AIRFLOW_BASE_URL}/api/v1/dags/{dag_id}/dagRuns" try: - response = requests.post(url, json=data, headers=AIRFLOW_HEADERS, timeout=300) + response = requests.post(url, json=data, headers=AIRFLOW_HEADERS) response.raise_for_status() return JsonResponse(response.json()) - except HTTPError as http_err: - return JsonResponse({'error': f'HTTP error occurred: {http_err}'}, status=response.status_code) except RequestException as req_err: - return JsonResponse({'error': f'Request error occurred: {req_err}'}, status=500) \ No newline at end of file + data = {"error": req_err} + return JsonResponse(data, status=status.HTTP_500_INTERNAL_SERVER_ERROR) diff --git a/backoffice/workflows/api/serializers.py b/backoffice/workflows/api/serializers.py index 179467cd..726f532b 100644 --- a/backoffice/workflows/api/serializers.py +++ b/backoffice/workflows/api/serializers.py @@ -4,6 +4,8 @@ from backoffice.workflows.documents import WorkflowDocument from backoffice.workflows.models import Workflow, WorkflowTicket +from ..constants import ResolutionDags + class WorkflowSerializer(serializers.ModelSerializer): class Meta: @@ -21,3 +23,8 @@ class WorkflowDocumentSerializer(DocumentSerializer): class Meta: document = WorkflowDocument fields = "__all__" + + +class AuthorResolutionSerializer(serializers.Serializer): + value = serializers.ChoiceField(choices=ResolutionDags) + create_ticket = serializers.BooleanField(default=False) diff --git a/backoffice/workflows/api/views.py b/backoffice/workflows/api/views.py index 012df238..69f473d3 100644 --- a/backoffice/workflows/api/views.py +++ b/backoffice/workflows/api/views.py @@ -9,7 +9,13 @@ from backoffice.workflows.documents import WorkflowDocument from backoffice.workflows.models import Workflow, WorkflowTicket -from .serializers import WorkflowDocumentSerializer, WorkflowSerializer, WorkflowTicketSerializer +from ..constants import WORKFLOW_DAG, ResolutionDags +from .serializers import ( + AuthorResolutionSerializer, + WorkflowDocumentSerializer, + WorkflowSerializer, + WorkflowTicketSerializer, +) class WorkflowViewSet(viewsets.ModelViewSet): @@ -72,43 +78,25 @@ def create(self, request, *args, **kwargs): return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) -class WorflowSubmissionViewSet(viewsets.ViewSet): - @action(detail=False, methods=["post"]) - def submit(self, request): - - # TODO workflow submission serializer - - # create workflow entry - workflow = Workflow.objects.create( - data=request.data, status="approval", core=True, is_update=False, workflow_type="AUTHOR_CREATE" - ) - - print("Triggering dag") - # response id, corresponds to the new workflow id - response = airflow_utils.trigger_airflow_dag("author_create_initialization_dag", str(workflow.id)) - - return Response({"data": response.content, "status_code": response.status_code}, status=status.HTTP_200_OK) - - @action(detail=False, methods=["post"]) - def resolve(self, request): - - data = request.data - create_ticket = data["create_ticket"] - resolution = data["resolution"] - extra_data = {"create_ticket": create_ticket, "resolution": resolution} +class AuthorWorkflowViewSet(viewsets.ViewSet): + serializer_class = WorkflowSerializer - if resolution == "accept": - dag_name = "author_create_approved_dag" - elif resolution == "reject": - dag_name = "author_create_rejected_dag" - else: - return Response( - {"message": "resolution method unrecognized"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR + def create(self, request): + serializer = self.serializer_class(data=request.data) + if serializer.is_valid(raise_exception=True): + workflow = Workflow.objects.create( + data=serializer.validated_data["data"], workflow_type=serializer.validated_data["workflow_type"] + ) + return airflow_utils.trigger_airflow_dag(WORKFLOW_DAG[workflow.workflow_type], str(workflow.id), workflow.data) + + @action(detail=True, methods=["post"]) + def resolve(self, request, pk=None): + serializer = AuthorResolutionSerializer(data=request.data) + if serializer.is_valid(raise_exception=True): + extra_data = {"create_ticket": serializer.validated_data["create_ticket"]} + return airflow_utils.trigger_airflow_dag( + ResolutionDags[serializer.validated_data["value"]].label, pk, extra_data ) - - response = airflow_utils.trigger_airflow_dag(dag_name, data["id"], extra_data) - - return Response({"data": response.content, "status_code": response.status_code}, status=status.HTTP_200_OK) class WorkflowDocumentView(BaseDocumentViewSet): diff --git a/backoffice/workflows/constants.py b/backoffice/workflows/constants.py index 0a8de579..353b6cd4 100644 --- a/backoffice/workflows/constants.py +++ b/backoffice/workflows/constants.py @@ -1,3 +1,5 @@ +from django.db import models + # tickets TICKET_TYPES = ( ("author_create_curation", "Author create curation"), @@ -6,18 +8,36 @@ DEFAULT_TICKET_TYPE = "author_create_curation" # workflows -DEFAULT_STATUS_CHOICE = "running" -DEFAULT_WORKFLOW_TYPE = "HEP_create" -STATUS_CHOICES = ( - ("running", "Running"), - ("approval", "Waiting for approval"), - ("completed", "Completed"), - ("error", "Error"), -) -WORKFLOW_TYPES = ( - ("HEP_CREATE", "HEP create"), - ("HEP_UPDATE", "HEP update"), - ("AUTHOR_CREATE", "Author create"), - ("AUTHOR_UPDATE", "Author update"), -) + +class StatusChoices(models.TextChoices): + RUNNING = "running", "Running" + APPROVAL = "approval", "Waiting for approva" + COMPLETED = "completed", "Completed" + ERROR = "error", "Error" + + +DEFAULT_STATUS_CHOICE = StatusChoices.RUNNING + + +class WorkflowType(models.TextChoices): + HEP_CREATE = "HEP_CREATE", "HEP create" + HEP_UPDATE = "HEP_UPDATE", "HEP update" + AUTHOR_CREATE = "AUTHOR_CREATE", "Author create" + AUTHOR_UPDATE = "AUTHOR_UPDATE", "Author update" + + +DEFAULT_WORKFLOW_TYPE = WorkflowType.HEP_CREATE + +# author dags for each workflow type +WORKFLOW_DAG = { + WorkflowType.HEP_CREATE: "", + WorkflowType.HEP_UPDATE: "", + WorkflowType.AUTHOR_CREATE: "author_create_initialization_dag", + WorkflowType.AUTHOR_UPDATE: "author_update_dag", +} + + +class ResolutionDags(models.TextChoices): + accept = "accept", "author_create_approved_dag" + reject = "reject", "author_create_rejected_dag" diff --git a/backoffice/workflows/migrations/0007_alter_workflow_core_alter_workflow_is_update.py b/backoffice/workflows/migrations/0007_alter_workflow_core_alter_workflow_is_update.py new file mode 100644 index 00000000..1f0673ae --- /dev/null +++ b/backoffice/workflows/migrations/0007_alter_workflow_core_alter_workflow_is_update.py @@ -0,0 +1,22 @@ +# Generated by Django 4.2.6 on 2024-06-20 09:07 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("workflows", "0006_workflow__created_at_workflow__updated_at"), + ] + + operations = [ + migrations.AlterField( + model_name="workflow", + name="core", + field=models.BooleanField(default=False), + ), + migrations.AlterField( + model_name="workflow", + name="is_update", + field=models.BooleanField(default=False), + ), + ] diff --git a/backoffice/workflows/migrations/0008_alter_workflow_status_alter_workflow_workflow_type.py b/backoffice/workflows/migrations/0008_alter_workflow_status_alter_workflow_workflow_type.py new file mode 100644 index 00000000..e879dd0f --- /dev/null +++ b/backoffice/workflows/migrations/0008_alter_workflow_status_alter_workflow_workflow_type.py @@ -0,0 +1,40 @@ +# Generated by Django 4.2.6 on 2024-07-09 12:42 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("workflows", "0007_alter_workflow_core_alter_workflow_is_update"), + ] + + operations = [ + migrations.AlterField( + model_name="workflow", + name="status", + field=models.CharField( + choices=[ + ("running", "Running"), + ("approval", "Waiting for approva"), + ("completed", "Completed"), + ("error", "Error"), + ], + default="running", + max_length=30, + ), + ), + migrations.AlterField( + model_name="workflow", + name="workflow_type", + field=models.CharField( + choices=[ + ("HEP_CREATE", "HEP create"), + ("HEP_UPDATE", "HEP update"), + ("AUTHOR_CREATE", "Author create"), + ("AUTHOR_UPDATE", "Author update"), + ], + default="HEP_CREATE", + max_length=30, + ), + ), + ] diff --git a/backoffice/workflows/models.py b/backoffice/workflows/models.py index bd20b686..1b3b4769 100644 --- a/backoffice/workflows/models.py +++ b/backoffice/workflows/models.py @@ -6,9 +6,9 @@ DEFAULT_STATUS_CHOICE, DEFAULT_TICKET_TYPE, DEFAULT_WORKFLOW_TYPE, - STATUS_CHOICES, TICKET_TYPES, - WORKFLOW_TYPES, + StatusChoices, + WorkflowType, ) @@ -17,17 +17,17 @@ class Workflow(models.Model): workflow_type = models.CharField( max_length=30, - choices=WORKFLOW_TYPES, + choices=WorkflowType.choices, default=DEFAULT_WORKFLOW_TYPE, ) data = models.JSONField() status = models.CharField( max_length=30, - choices=STATUS_CHOICES, + choices=StatusChoices.choices, default=DEFAULT_STATUS_CHOICE, ) - core = models.BooleanField() - is_update = models.BooleanField() + core = models.BooleanField(default=False) + is_update = models.BooleanField(default=False) _created_at = models.DateTimeField(auto_now_add=True) _updated_at = models.DateTimeField(auto_now=True) diff --git a/backoffice/workflows/tests/test_views.py b/backoffice/workflows/tests/test_views.py index 81bbcbf6..4ca71b26 100644 --- a/backoffice/workflows/tests/test_views.py +++ b/backoffice/workflows/tests/test_views.py @@ -1,11 +1,16 @@ +from unittest.mock import patch + from django.apps import apps from django.contrib.auth import get_user_model from django.contrib.auth.models import Group from django.test import TransactionTestCase +from django.urls import reverse from opensearch_dsl import Index +from rest_framework import status from rest_framework.test import APIClient from backoffice.workflows.api.serializers import WorkflowTicketSerializer +from backoffice.workflows.constants import StatusChoices from backoffice.workflows.models import WorkflowTicket User = get_user_model() @@ -37,7 +42,7 @@ class TestWorkflowViewSet(BaseTransactionTestCase): def setUp(self): super().setUp() - self.workflow = Workflow.objects.create(data={}, status="approval", core=True, is_update=False) + self.workflow = Workflow.objects.create(data={}, status=StatusChoices.APPROVAL, core=True, is_update=False) def test_list_curator(self): self.api_client.force_authenticate(user=self.curator) @@ -70,7 +75,7 @@ def setUp(self): super().setUp() index = Index("backoffice-backend-test-workflows") index.delete(ignore=[400, 404]) - self.workflow = Workflow.objects.create(data={}, status="approval", core=True, is_update=False) + self.workflow = Workflow.objects.create(data={}, status=StatusChoices.APPROVAL, core=True, is_update=False) def test_list_curator(self): self.api_client.force_authenticate(user=self.curator) @@ -100,7 +105,7 @@ class TestWorkflowPartialUpdateViewSet(BaseTransactionTestCase): def setUp(self): super().setUp() - self.workflow = Workflow.objects.create(data={}, status="approval", core=True, is_update=False) + self.workflow = Workflow.objects.create(data={}, status=StatusChoices.APPROVAL, core=True, is_update=False) @property def endpoint(self): @@ -198,3 +203,66 @@ def test_create_happy_flow(self): assert "ticket_type" in response.data assert response.data == WorkflowTicketSerializer(WorkflowTicket.objects.last()).data + + +class TestAuthorWorkflowViewSet(BaseTransactionTestCase): + endpoint = "/api/authors/" + reset_sequences = True + fixtures = ["backoffice/fixtures/groups.json"] + + @patch("backoffice.workflows.airflow_utils.requests.post") + def test_create_author(self, mock_post): + self.api_client.force_authenticate(user=self.curator) + + mock_response = mock_post.return_value + mock_response.status_code = status.HTTP_200_OK + mock_response.json.return_value = {"key": "value"} + + data = { + "workflow_type": "AUTHOR_CREATE", + "status": "running", + "data": { + "native_name": "NATIVE_NAME", + "alternate_name": "NAME", + "display_name": "FIRST_NAME", + "family_name": "LAST_NAME", + "given_name": "GIVEN_NAME", + }, + } + + url = reverse("api:workflows-authors-list") + response = self.api_client.post(url, format="json", data=data) + + self.assertEqual(response.status_code, 200) + + @patch("backoffice.workflows.airflow_utils.requests.post") + def test_accept_author(self, mock_post): + self.api_client.force_authenticate(user=self.curator) + + mock_response = mock_post.return_value + mock_response.status_code = status.HTTP_200_OK + mock_response.json.return_value = {"key": "value"} + + data = {"create_ticket": True, "value": "accept"} + + response = self.api_client.post( + reverse("api:workflows-authors-resolve", kwargs={"pk": "WORKFLOW_ID"}), format="json", data=data + ) + + self.assertEqual(response.status_code, 200) + + @patch("backoffice.workflows.airflow_utils.requests.post") + def test_reject_author(self, mock_post): + self.api_client.force_authenticate(user=self.curator) + + mock_response = mock_post.return_value + mock_response.status_code = status.HTTP_200_OK + mock_response.json.return_value = {"key": "value"} + + data = {"create_ticket": True, "value": "reject"} + + response = self.api_client.post( + reverse("api:workflows-authors-resolve", kwargs={"pk": "WORKFLOW_ID"}), format="json", data=data + ) + + self.assertEqual(response.status_code, 200) diff --git a/backoffice/workflows/urls.py b/backoffice/workflows/urls.py deleted file mode 100644 index c64521d5..00000000 --- a/backoffice/workflows/urls.py +++ /dev/null @@ -1,7 +0,0 @@ -from django.urls import include, path - -from backoffice.config.api_router import router - -urlpatterns = [ - path("api/", include(router.urls)), -] diff --git a/config/api_router.py b/config/api_router.py index 6bbe0047..74452a7c 100644 --- a/config/api_router.py +++ b/config/api_router.py @@ -2,7 +2,12 @@ from rest_framework.routers import DefaultRouter, SimpleRouter from backoffice.users.api.views import UserViewSet -from backoffice.workflows.api.views import WorkflowPartialUpdateViewSet, WorkflowTicketViewSet, WorkflowViewSet +from backoffice.workflows.api.views import ( + AuthorWorkflowViewSet, + WorkflowPartialUpdateViewSet, + WorkflowTicketViewSet, + WorkflowViewSet, +) if settings.DEBUG: router = DefaultRouter() @@ -12,6 +17,7 @@ router.register("users", UserViewSet) # Workflows +router.register("workflows/authors", AuthorWorkflowViewSet, basename="workflows-authors"), router.register("workflows", WorkflowViewSet, basename="workflows") router.register("workflow-update", WorkflowPartialUpdateViewSet, basename="workflow-update") router.register("workflow-ticket", WorkflowTicketViewSet, basename="workflow-ticket"), diff --git a/config/settings/local.py b/config/settings/local.py index 0ecfc6ea..6a2f8b6a 100644 --- a/config/settings/local.py +++ b/config/settings/local.py @@ -11,7 +11,7 @@ default="uBCAZjYhsVU3Zg8k96GM2c0GqgnTHyj0L3UhNQd4kQTktLyFztesAqb81jucXSMY", ) # https://docs.djangoproject.com/en/dev/ref/settings/#allowed-hosts -ALLOWED_HOSTS = ["localhost", "0.0.0.0", "127.0.0.1"] +ALLOWED_HOSTS = ["localhost", "0.0.0.0", "127.0.0.1", "host.docker.internal"] # CACHES # ------------------------------------------------------------------------------ diff --git a/config/urls.py b/config/urls.py index ece9984f..381dc728 100644 --- a/config/urls.py +++ b/config/urls.py @@ -38,7 +38,6 @@ SpectacularSwaggerView.as_view(url_name="api-schema"), name="api-docs", ), - path("api/", include("backoffice.workflows.urls", namespace="workflow-submission")), ] if settings.DEBUG: