Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
author-consolidation: fixed aggreggations, added swagger documentatio…
Browse files Browse the repository at this point in the history
…n, updated partialupdate but needs testing
  • Loading branch information
DonHaul authored and drjova committed Aug 6, 2024
1 parent c1c7819 commit 351ec1d
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 45 deletions.
50 changes: 49 additions & 1 deletion backoffice/backoffice/workflows/api/serializers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from django_elasticsearch_dsl_drf.serializers import DocumentSerializer
from drf_spectacular.utils import OpenApiExample, extend_schema_serializer
from rest_framework import serializers

from backoffice.workflows.constants import ResolutionDags
from backoffice.workflows.constants import ResolutionDags, StatusChoices, WorkflowType
from backoffice.workflows.documents import WorkflowDocument
from backoffice.workflows.models import Workflow, WorkflowTicket

Expand All @@ -24,6 +25,53 @@ class Meta:
fields = "__all__"


@extend_schema_serializer(
exclude_fields=[
"_created_at",
"_updated_at",
], # Exclude internal fields from schema
examples=[
OpenApiExample(
"Author Workflow Serializer",
summary="Author Workflow Serializer no data",
description="Author Workflow Serializer",
value={
"workflow_type": WorkflowType.AUTHOR_CREATE,
"status": StatusChoices.RUNNING,
"core": False,
"is_update": False,
"data": {},
},
),
],
)
class WorkflowAuthorSerializer(WorkflowSerializer):
def validate_workflow_type(self, value):
allowed_workflow_types = [
WorkflowType.AUTHOR_CREATE,
WorkflowType.AUTHOR_UPDATE,
]
if value not in allowed_workflow_types:
raise serializers.ValidationError(
f"The field `workflow_type` should be on of {allowed_workflow_types}"
)
return value


@extend_schema_serializer(
examples=[
OpenApiExample(
"Accept",
description="Author Workflow Serializer",
value={"value": "accept", "create_ticket": False},
),
OpenApiExample(
"Reject",
description="Author Workflow Serializer",
value={"value": "reject", "create_ticket": False},
),
],
)
class AuthorResolutionSerializer(serializers.Serializer):
value = serializers.ChoiceField(choices=ResolutionDags)
create_ticket = serializers.BooleanField(default=False)
124 changes: 108 additions & 16 deletions backoffice/backoffice/workflows/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@
OrderingFilterBackend,
)
from django_elasticsearch_dsl_drf.viewsets import BaseDocumentViewSet
from drf_spectacular.utils import (
OpenApiExample,
OpenApiParameter,
OpenApiTypes,
extend_schema,
extend_schema_view,
)
from opensearch_dsl import TermsFacet
from rest_framework import status, viewsets
from rest_framework.decorators import action
Expand All @@ -18,11 +25,17 @@
from backoffice.workflows import airflow_utils
from backoffice.workflows.api.serializers import (
AuthorResolutionSerializer,
WorkflowAuthorSerializer,
WorkflowDocumentSerializer,
WorkflowSerializer,
WorkflowTicketSerializer,
)
from backoffice.workflows.constants import WORKFLOW_DAGS, ResolutionDags
from backoffice.workflows.constants import (
WORKFLOW_DAGS,
ResolutionDags,
StatusChoices,
WorkflowType,
)
from backoffice.workflows.documents import WorkflowDocument
from backoffice.workflows.models import Workflow, WorkflowTicket

Expand All @@ -40,19 +53,6 @@ def get_queryset(self):
return self.queryset


class WorkflowPartialUpdateViewSet(viewsets.ViewSet):
def partial_update(self, request, pk=None):
workflow_instance = get_object_or_404(Workflow, pk=pk)
serializer = WorkflowSerializer(
workflow_instance, data=request.data, partial=True
)

if serializer.is_valid():
serializer.save()
return Response(serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)


class WorkflowTicketViewSet(viewsets.ViewSet):
def retrieve(self, request, *args, **kwargs):
workflow_id = kwargs.get("pk")
Expand Down Expand Up @@ -101,8 +101,13 @@ def create(self, request, *args, **kwargs):


class AuthorWorkflowViewSet(viewsets.ViewSet):
serializer_class = WorkflowSerializer
serializer_class = WorkflowAuthorSerializer

@extend_schema(
summary="Create a New Author",
description="Creates a new author, launches the required airflow dags.",
request=serializer_class,
)
def create(self, request):
logger.info("Creating workflow with data: %s", request.data)
serializer = self.serializer_class(data=request.data)
Expand All @@ -122,6 +127,34 @@ def create(self, request):
workflow.data,
)

@extend_schema(
summary="Partially Updates Author",
description="Updates specific fields of the author.",
examples=[
OpenApiExample(
"Status Update",
value={
"status": StatusChoices.COMPLETED,
},
),
],
)
def partial_update(self, request, pk=None):
logger.info("Updating workflow with data: %s", request.data)
workflow_instance = get_object_or_404(Workflow, pk=pk)
serializer = self.serializer_class(
workflow_instance, data=request.data, partial=True
)

if serializer.is_valid(raise_exception=True):
serializer.save()
return Response(serializer.data)

@extend_schema(
summary="Accept or Reject Author",
description="Acceps or rejects an author, run associated dags.",
request=AuthorResolutionSerializer,
)
@action(detail=True, methods=["post"])
def resolve(self, request, pk=None):
logger.info("Resolving data: %s", request.data)
Expand All @@ -138,6 +171,24 @@ def resolve(self, request, pk=None):
ResolutionDags[serializer.validated_data["value"]].label, pk, extra_data
)

@extend_schema(
summary="Restart an Author Workflow",
description="Restart an Author Workflow.",
examples=[
OpenApiExample(
"Restart Whole Workflow",
value={},
),
OpenApiExample(
"Restart Failing Task",
value={"restart_current_task": True},
),
OpenApiExample(
"Restart Workflow with Custom Parameters",
value={"params": {}},
),
],
)
@action(detail=True, methods=["post"])
def restart(self, request, pk=None):
workflow = Workflow.objects.get(id=pk)
Expand All @@ -150,6 +201,44 @@ def restart(self, request, pk=None):
)


@extend_schema_view(
list=extend_schema(
summary="Search with opensearch",
description="text",
parameters=[
OpenApiParameter(
name="search",
description="Search for status and workflow_type",
required=False,
type=OpenApiTypes.STR,
location=OpenApiParameter.QUERY,
),
OpenApiParameter(
name="ordering",
description="order by _updated_at",
required=False,
type=OpenApiTypes.STR,
location=OpenApiParameter.QUERY,
),
OpenApiParameter(
name="status",
description="status",
required=False,
type=OpenApiTypes.STR,
enum=StatusChoices.values,
location=OpenApiParameter.QUERY,
),
OpenApiParameter(
name="workflow_type",
description="workflow_type",
required=False,
type=OpenApiTypes.STR,
enum=WorkflowType.values,
location=OpenApiParameter.QUERY,
),
],
),
)
class WorkflowDocumentView(BaseDocumentViewSet):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand All @@ -171,7 +260,10 @@ def __init__(self, *args, **kwargs):
"is_update",
}

filter_fields = {"status": "status", "workflow_type": "workflow_type"}
filter_fields = {
"status": "status.keyword",
"workflow_type": "workflow_type.keyword",
}

ordering_fields = {"_updated_at": "_updated_at"}

Expand Down
10 changes: 6 additions & 4 deletions backoffice/backoffice/workflows/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ def test_list_anonymous(self):
self.assertEqual(response.status_code, 403)


# @pytest.mark.usefixtures("rebuild_opensearch_index")
class TestWorkflowSearchViewSet(BaseTransactionTestCase):
endpoint = "/api/workflows/search/"
reset_sequences = True
Expand Down Expand Up @@ -109,7 +108,7 @@ def test_list_anonymous(self):
self.assertEqual(response.status_code, 403)


class TestWorkflowPartialUpdateViewSet(BaseTransactionTestCase):
class TestAuthorWorkflowPartialUpdateViewSet(BaseTransactionTestCase):
endpoint_base_url = "/api/workflow-update"
reset_sequences = True
fixtures = ["backoffice/fixtures/groups.json"]
Expand All @@ -122,7 +121,10 @@ def setUp(self):

@property
def endpoint(self):
return f"{self.endpoint_base_url}/{self.workflow.id}/"
return reverse(
"api:workflows-authors-detail",
kwargs={"pk": self.workflow.id},
)

def test_patch_curator(self):
self.api_client.force_authenticate(user=self.curator)
Expand Down Expand Up @@ -427,7 +429,7 @@ def test_search_workflow_type(self):
def test_filter_status(self):
self.api_client.force_authenticate(user=self.admin)

url = reverse("search:workflow-list") + f'?status="={StatusChoices.RUNNING}'
url = reverse("search:workflow-list") + f"?status={StatusChoices.RUNNING}"

response = self.api_client.get(url)

Expand Down
4 changes: 0 additions & 4 deletions backoffice/config/api_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from backoffice.users.api.views import UserViewSet
from backoffice.workflows.api.views import (
AuthorWorkflowViewSet,
WorkflowPartialUpdateViewSet,
WorkflowTicketViewSet,
WorkflowViewSet,
)
Expand All @@ -20,9 +19,6 @@
),
)
router.register("workflows", WorkflowViewSet, basename="workflows")
router.register(
"workflow-update", WorkflowPartialUpdateViewSet, basename="workflow-update"
)
(router.register("workflow-ticket", WorkflowTicketViewSet, basename="workflow-ticket"),)
app_name = "api"
urlpatterns = router.urls
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ x-airflow-common: &airflow-common
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session"
# AIRFLOW__WEBSERVER__RELOAD_ON_PLUGIN_CHANGE: "true" # used when modifying plugins
# yamllint disable rule:line-length
# Use simple http server on scheduler for health checks
# See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server
Expand Down
14 changes: 10 additions & 4 deletions workflows/dags/author/author_create/author_create_approved.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from airflow.decorators import dag, task
from airflow.models.param import Param
from airflow.utils.trigger_rule import TriggerRule
from hooks.backoffice.workflow_management_hook import WorkflowManagementHook
from hooks.backoffice.workflow_management_hook import AUTHORS, WorkflowManagementHook
from hooks.backoffice.workflow_ticket_management_hook import (
WorkflowTicketManagementHook,
)
Expand Down Expand Up @@ -55,7 +55,9 @@ def author_create_approved_dag():
def set_workflow_status_to_running(**context):
status_name = "running"
workflow_management_hook.set_workflow_status(
status_name=status_name, workflow_id=context["params"]["workflow_id"]
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
typ=AUTHORS,
)

@task.branch()
Expand Down Expand Up @@ -130,7 +132,9 @@ def close_author_create_user_ticket(**context: dict) -> None:
def set_author_create_workflow_status_to_completed(**context: dict) -> None:
status_name = "completed"
workflow_management_hook.set_workflow_status(
status_name=status_name, workflow_id=context["params"]["workflow_id"]
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
typ=AUTHORS,
)

@task
Expand All @@ -144,7 +148,9 @@ def set_author_create_workflow_status_to_error(**context: dict) -> None:
status_name = ti.xcom_pull(task_ids="create_author_on_inspire")
logger.info(f"Workflow status: {status_name}")
workflow_management_hook.set_workflow_status(
status_name=status_name, workflow_id=context["params"]["workflow_id"]
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
typ=AUTHORS,
)

# task definitions
Expand Down
11 changes: 8 additions & 3 deletions workflows/dags/author/author_create/author_create_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from airflow.decorators import dag, task
from airflow.models.param import Param
from hooks.backoffice.workflow_management_hook import WorkflowManagementHook
from hooks.backoffice.workflow_management_hook import AUTHORS, WorkflowManagementHook
from hooks.backoffice.workflow_ticket_management_hook import (
WorkflowTicketManagementHook,
)
Expand Down Expand Up @@ -43,7 +43,9 @@ def author_create_initialization_dag():
def set_workflow_status_to_running(**context):
status_name = "running"
workflow_management_hook.set_workflow_status(
status_name=status_name, workflow_id=context["params"]["workflow_id"]
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
typ=AUTHORS,
)

@task()
Expand All @@ -52,6 +54,7 @@ def set_schema(**context):
workflow_management_hook.partial_update_workflow(
workflow_id=context["params"]["workflow_id"],
workflow_partial_update_data={"data": {"$schema": schema}},
typ=AUTHORS,
)

@task()
Expand Down Expand Up @@ -80,7 +83,9 @@ def create_author_create_user_ticket(**context: dict) -> None:
def set_author_create_workflow_status_to_approval(**context: dict) -> None:
status_name = "approval"
workflow_management_hook.set_workflow_status(
status_name=status_name, workflow_id=context["params"]["workflow_id"]
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
typ=AUTHORS,
)

# task dependencies
Expand Down
Loading

0 comments on commit 351ec1d

Please sign in to comment.