Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

author-consolidation #60

Merged
merged 2 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion backoffice/backoffice/workflows/api/serializers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from django_elasticsearch_dsl_drf.serializers import DocumentSerializer
from drf_spectacular.utils import OpenApiExample, extend_schema_serializer
from rest_framework import serializers

from backoffice.workflows.constants import ResolutionDags
from backoffice.workflows.constants import ResolutionDags, StatusChoices, WorkflowType
from backoffice.workflows.documents import WorkflowDocument
from backoffice.workflows.models import Workflow, WorkflowTicket

Expand All @@ -24,6 +25,53 @@ class Meta:
fields = "__all__"


@extend_schema_serializer(
exclude_fields=[
"_created_at",
"_updated_at",
], # Exclude internal fields from schema
examples=[
OpenApiExample(
"Author Workflow Serializer",
summary="Author Workflow Serializer no data",
description="Author Workflow Serializer",
value={
"workflow_type": WorkflowType.AUTHOR_CREATE,
"status": StatusChoices.RUNNING,
"core": False,
"is_update": False,
"data": {},
},
),
],
)
class WorkflowAuthorSerializer(WorkflowSerializer):
def validate_workflow_type(self, value):
allowed_workflow_types = [
WorkflowType.AUTHOR_CREATE,
WorkflowType.AUTHOR_UPDATE,
]
if value not in allowed_workflow_types:
raise serializers.ValidationError(
f"The field `workflow_type` should be on of {allowed_workflow_types}"
)
return value


@extend_schema_serializer(
examples=[
OpenApiExample(
"Accept",
description="Author Workflow Serializer",
value={"value": "accept", "create_ticket": False},
),
OpenApiExample(
"Reject",
description="Author Workflow Serializer",
value={"value": "reject", "create_ticket": False},
),
],
)
class AuthorResolutionSerializer(serializers.Serializer):
value = serializers.ChoiceField(choices=ResolutionDags)
create_ticket = serializers.BooleanField(default=False)
124 changes: 108 additions & 16 deletions backoffice/backoffice/workflows/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@
OrderingFilterBackend,
)
from django_elasticsearch_dsl_drf.viewsets import BaseDocumentViewSet
from drf_spectacular.utils import (
OpenApiExample,
OpenApiParameter,
OpenApiTypes,
extend_schema,
extend_schema_view,
)
from opensearch_dsl import TermsFacet
from rest_framework import status, viewsets
from rest_framework.decorators import action
Expand All @@ -18,11 +25,17 @@
from backoffice.workflows import airflow_utils
from backoffice.workflows.api.serializers import (
AuthorResolutionSerializer,
WorkflowAuthorSerializer,
WorkflowDocumentSerializer,
WorkflowSerializer,
WorkflowTicketSerializer,
)
from backoffice.workflows.constants import WORKFLOW_DAGS, ResolutionDags
from backoffice.workflows.constants import (
WORKFLOW_DAGS,
ResolutionDags,
StatusChoices,
WorkflowType,
)
from backoffice.workflows.documents import WorkflowDocument
from backoffice.workflows.models import Workflow, WorkflowTicket

Expand All @@ -40,19 +53,6 @@ def get_queryset(self):
return self.queryset


class WorkflowPartialUpdateViewSet(viewsets.ViewSet):
def partial_update(self, request, pk=None):
workflow_instance = get_object_or_404(Workflow, pk=pk)
serializer = WorkflowSerializer(
workflow_instance, data=request.data, partial=True
)

if serializer.is_valid():
serializer.save()
return Response(serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)


class WorkflowTicketViewSet(viewsets.ViewSet):
def retrieve(self, request, *args, **kwargs):
workflow_id = kwargs.get("pk")
Expand Down Expand Up @@ -101,8 +101,13 @@ def create(self, request, *args, **kwargs):


class AuthorWorkflowViewSet(viewsets.ViewSet):
serializer_class = WorkflowSerializer
serializer_class = WorkflowAuthorSerializer

@extend_schema(
summary="Create a New Author",
description="Creates a new author, launches the required airflow dags.",
request=serializer_class,
)
def create(self, request):
logger.info("Creating workflow with data: %s", request.data)
serializer = self.serializer_class(data=request.data)
Expand All @@ -122,6 +127,34 @@ def create(self, request):
workflow.data,
)

@extend_schema(
summary="Partially Updates Author",
description="Updates specific fields of the author.",
examples=[
OpenApiExample(
"Status Update",
value={
"status": StatusChoices.COMPLETED,
},
),
],
)
def partial_update(self, request, pk=None):
logger.info("Updating workflow with data: %s", request.data)
workflow_instance = get_object_or_404(Workflow, pk=pk)
serializer = self.serializer_class(
workflow_instance, data=request.data, partial=True
)

if serializer.is_valid(raise_exception=True):
serializer.save()
return Response(serializer.data)

@extend_schema(
summary="Accept or Reject Author",
description="Acceps or rejects an author, run associated dags.",
request=AuthorResolutionSerializer,
)
@action(detail=True, methods=["post"])
def resolve(self, request, pk=None):
logger.info("Resolving data: %s", request.data)
Expand All @@ -138,6 +171,24 @@ def resolve(self, request, pk=None):
ResolutionDags[serializer.validated_data["value"]].label, pk, extra_data
)

@extend_schema(
summary="Restart an Author Workflow",
description="Restart an Author Workflow.",
examples=[
OpenApiExample(
"Restart Whole Workflow",
value={},
),
OpenApiExample(
"Restart Failing Task",
value={"restart_current_task": True},
),
OpenApiExample(
"Restart Workflow with Custom Parameters",
value={"params": {}},
),
],
)
@action(detail=True, methods=["post"])
def restart(self, request, pk=None):
workflow = Workflow.objects.get(id=pk)
Expand All @@ -150,6 +201,44 @@ def restart(self, request, pk=None):
)


@extend_schema_view(
list=extend_schema(
summary="Search with opensearch",
description="text",
parameters=[
OpenApiParameter(
name="search",
description="Search for status and workflow_type",
required=False,
type=OpenApiTypes.STR,
location=OpenApiParameter.QUERY,
),
OpenApiParameter(
name="ordering",
description="order by _updated_at",
required=False,
type=OpenApiTypes.STR,
location=OpenApiParameter.QUERY,
),
OpenApiParameter(
name="status",
description="status",
required=False,
type=OpenApiTypes.STR,
enum=StatusChoices.values,
location=OpenApiParameter.QUERY,
),
OpenApiParameter(
name="workflow_type",
description="workflow_type",
required=False,
type=OpenApiTypes.STR,
enum=WorkflowType.values,
location=OpenApiParameter.QUERY,
),
],
),
)
class WorkflowDocumentView(BaseDocumentViewSet):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand All @@ -171,7 +260,10 @@ def __init__(self, *args, **kwargs):
"is_update",
}

filter_fields = {"status": "status", "workflow_type": "workflow_type"}
filter_fields = {
"status": "status.keyword",
"workflow_type": "workflow_type.keyword",
}

ordering_fields = {"_updated_at": "_updated_at"}

Expand Down
10 changes: 6 additions & 4 deletions backoffice/backoffice/workflows/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ def test_list_anonymous(self):
self.assertEqual(response.status_code, 403)


# @pytest.mark.usefixtures("rebuild_opensearch_index")
class TestWorkflowSearchViewSet(BaseTransactionTestCase):
endpoint = "/api/workflows/search/"
reset_sequences = True
Expand Down Expand Up @@ -109,7 +108,7 @@ def test_list_anonymous(self):
self.assertEqual(response.status_code, 403)


class TestWorkflowPartialUpdateViewSet(BaseTransactionTestCase):
class TestAuthorWorkflowPartialUpdateViewSet(BaseTransactionTestCase):
endpoint_base_url = "/api/workflow-update"
reset_sequences = True
fixtures = ["backoffice/fixtures/groups.json"]
Expand All @@ -122,7 +121,10 @@ def setUp(self):

@property
def endpoint(self):
return f"{self.endpoint_base_url}/{self.workflow.id}/"
return reverse(
"api:workflows-authors-detail",
kwargs={"pk": self.workflow.id},
)

def test_patch_curator(self):
self.api_client.force_authenticate(user=self.curator)
Expand Down Expand Up @@ -427,7 +429,7 @@ def test_search_workflow_type(self):
def test_filter_status(self):
self.api_client.force_authenticate(user=self.admin)

url = reverse("search:workflow-list") + f'?status="={StatusChoices.RUNNING}'
url = reverse("search:workflow-list") + f"?status={StatusChoices.RUNNING}"

response = self.api_client.get(url)

Expand Down
4 changes: 0 additions & 4 deletions backoffice/config/api_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from backoffice.users.api.views import UserViewSet
from backoffice.workflows.api.views import (
AuthorWorkflowViewSet,
WorkflowPartialUpdateViewSet,
WorkflowTicketViewSet,
WorkflowViewSet,
)
Expand All @@ -20,9 +19,6 @@
),
)
router.register("workflows", WorkflowViewSet, basename="workflows")
router.register(
"workflow-update", WorkflowPartialUpdateViewSet, basename="workflow-update"
)
(router.register("workflow-ticket", WorkflowTicketViewSet, basename="workflow-ticket"),)
app_name = "api"
urlpatterns = router.urls
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ x-airflow-common: &airflow-common
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session"
AIRFLOW__WEBSERVER__RELOAD_ON_PLUGIN_CHANGE: "true" # used when modifying plugins
# yamllint disable rule:line-length
# Use simple http server on scheduler for health checks
# See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server
Expand Down
14 changes: 10 additions & 4 deletions workflows/dags/author/author_create/author_create_approved.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from airflow.decorators import dag, task
from airflow.models.param import Param
from airflow.utils.trigger_rule import TriggerRule
from hooks.backoffice.workflow_management_hook import WorkflowManagementHook
from hooks.backoffice.workflow_management_hook import AUTHORS, WorkflowManagementHook
from hooks.backoffice.workflow_ticket_management_hook import (
WorkflowTicketManagementHook,
)
Expand Down Expand Up @@ -55,7 +55,9 @@ def author_create_approved_dag():
def set_workflow_status_to_running(**context):
status_name = "running"
workflow_management_hook.set_workflow_status(
status_name=status_name, workflow_id=context["params"]["workflow_id"]
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
)

@task.branch()
Expand Down Expand Up @@ -130,7 +132,9 @@ def close_author_create_user_ticket(**context: dict) -> None:
def set_author_create_workflow_status_to_completed(**context: dict) -> None:
status_name = "completed"
workflow_management_hook.set_workflow_status(
status_name=status_name, workflow_id=context["params"]["workflow_id"]
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
)

@task
Expand All @@ -144,7 +148,9 @@ def set_author_create_workflow_status_to_error(**context: dict) -> None:
status_name = ti.xcom_pull(task_ids="create_author_on_inspire")
logger.info(f"Workflow status: {status_name}")
workflow_management_hook.set_workflow_status(
status_name=status_name, workflow_id=context["params"]["workflow_id"]
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
)

# task definitions
Expand Down
11 changes: 8 additions & 3 deletions workflows/dags/author/author_create/author_create_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from airflow.decorators import dag, task
from airflow.models.param import Param
from hooks.backoffice.workflow_management_hook import WorkflowManagementHook
from hooks.backoffice.workflow_management_hook import AUTHORS, WorkflowManagementHook
from hooks.backoffice.workflow_ticket_management_hook import (
WorkflowTicketManagementHook,
)
Expand Down Expand Up @@ -43,7 +43,9 @@ def author_create_initialization_dag():
def set_workflow_status_to_running(**context):
status_name = "running"
workflow_management_hook.set_workflow_status(
status_name=status_name, workflow_id=context["params"]["workflow_id"]
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
)

@task()
Expand All @@ -52,6 +54,7 @@ def set_schema(**context):
workflow_management_hook.partial_update_workflow(
workflow_id=context["params"]["workflow_id"],
workflow_partial_update_data={"data": {"$schema": schema}},
collection=AUTHORS,
)

@task()
Expand Down Expand Up @@ -80,7 +83,9 @@ def create_author_create_user_ticket(**context: dict) -> None:
def set_author_create_workflow_status_to_approval(**context: dict) -> None:
status_name = "approval"
workflow_management_hook.set_workflow_status(
status_name=status_name, workflow_id=context["params"]["workflow_id"]
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
)

# task dependencies
Expand Down
Loading
Loading