Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main' into restart-workflow-data-loss
Browse files Browse the repository at this point in the history
  • Loading branch information
DonHaul authored Aug 27, 2024
2 parents a6cbbf8 + d52647a commit 739e9fd
Show file tree
Hide file tree
Showing 24 changed files with 1,115 additions and 100 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test-workflows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
-v "$(pwd)"/tests:/opt/airflow/tests
-v "$(pwd)"/requirements-test.txt:/opt/airflow/requirements-test.txt
-v "$(pwd)"/data:/opt/airflow/data
-v "$(pwd)"/scripts/variables/variables.json:/opt/airflow/variables.json
-v "$(pwd)"/scripts:/opt/airflow/scripts
-e AIRFLOW__CORE__EXECUTOR=CeleryExecutor
-e AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:[email protected]:5432/airflow
-e AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:[email protected]:5432/airflow
Expand All @@ -61,4 +61,4 @@ jobs:
-e AIRFLOW__CORE__LOAD_EXAMPLES="false"
-e AIRFLOW__API__AUTH_BACKENDS="airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session"
registry.cern.ch/cern-sis/inspire/workflows@${{ needs.build.outputs.image-id }}
bash -c "pip install -r requirements-test.txt && airflow db init && airflow variables import /opt/airflow/variables.json && pytest /opt/airflow/tests"
bash -c "pip install -r requirements-test.txt && airflow db init && airflow connections import /opt/airflow/scripts/connections/connections.json && airflow variables import /opt/airflow/scripts/variables/variables.json && pytest /opt/airflow/tests"
79 changes: 50 additions & 29 deletions backoffice/backoffice/workflows/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from django_json_widget.widgets import JSONEditorWidget

from backoffice.management.permissions import IsAdminOrCuratorUser
from backoffice.workflows.models import Workflow
from backoffice.workflows.models import Decision, Workflow


class WorkflowsAdminSite(admin.AdminSite):
Expand All @@ -30,8 +30,41 @@ def has_permission(self, request):
)


class BaseModelAdmin(admin.ModelAdmin):
def has_view_permission(self, request, obj=None):
"""
Returns True if the user has permission to view the Workflow model.
"""
permission_check = IsAdminOrCuratorUser()
return request.user.is_superuser or permission_check.has_permission(
request, self
)

def has_change_permission(self, request, obj=None):
"""
Returns True if the user has permission to change the Workflow model.
"""
permission_check = IsAdminOrCuratorUser()
return request.user.is_superuser or permission_check.has_permission(
request, self
)

def has_delete_permission(self, request, obj=None):
"""
Returns True if the user has permission to delete the Workflow model.
"""
permission_check = IsAdminOrCuratorUser()
return request.user.is_superuser or permission_check.has_permission(
request, self
)

formfield_overrides = {
JSONField: {"widget": JSONEditorWidget},
}


@admin.register(Workflow)
class WorkflowAdmin(admin.ModelAdmin):
class WorkflowAdmin(BaseModelAdmin):
"""
Admin class for Workflow model. Define get, update and delete permissions.
"""
Expand All @@ -56,33 +89,21 @@ class WorkflowAdmin(admin.ModelAdmin):
"_updated_at",
]

formfield_overrides = {
JSONField: {"widget": JSONEditorWidget},
}

def has_view_permission(self, request, obj=None):
"""
Returns True if the user has permission to view the Workflow model.
"""
permission_check = IsAdminOrCuratorUser()
return request.user.is_superuser or permission_check.has_permission(
request, self
)
@admin.register(Decision)
class DecisionAdmin(BaseModelAdmin):
"""
Admin class for Decision model. Define get, update and delete permissions.
"""

def has_change_permission(self, request, obj=None):
"""
Returns True if the user has permission to change the Workflow model.
"""
permission_check = IsAdminOrCuratorUser()
return request.user.is_superuser or permission_check.has_permission(
request, self
)
ordering = ("-_updated_at",)
search_fields = ["id", "data"]
list_display = ("id", "action_value", "user", "workflow_id")
list_filter = [
"action",
"user",
]

def has_delete_permission(self, request, obj=None):
"""
Returns True if the user has permission to delete the Workflow model.
"""
permission_check = IsAdminOrCuratorUser()
return request.user.is_superuser or permission_check.has_permission(
request, self
)
@admin.display(description="action")
def action_value(self, obj):
return obj.action
18 changes: 17 additions & 1 deletion backoffice/backoffice/workflows/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,19 @@ def restart_workflow_dags(workflow_id, workflow_type, params=None):
:param params: parameters of new dag execution
:returns: request response
"""
delete_workflow_dag_runs(workflow_id, workflow_type)

return trigger_airflow_dag(
WORKFLOW_DAGS[workflow_type].initialize, str(workflow_id), params
)


def delete_workflow_dag_runs(workflow_id, workflow_type):
"""Deletes runs of a given workflow.
:param workflow_id: workflow_id for dags that should be restarted
:param workflow_type: type of workflow the will be restarted
"""
executed_dags_for_workflow = find_executed_dags(workflow_id, workflow_type)

data = None
Expand All @@ -166,6 +179,9 @@ def restart_workflow_dags(workflow_id, workflow_type, params=None):
if data is None:
data = dag_data["conf"].get("data")

return trigger_airflow_dag(
trigger_airflow_dag(
WORKFLOW_DAGS[workflow_type].initialize, str(workflow_id), params or data

return JsonResponse(
data={"message": f"Dag runs for worfklow {workflow_id} have been deleted"}
)
12 changes: 11 additions & 1 deletion backoffice/backoffice/workflows/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@

from backoffice.workflows.constants import ResolutionDags, StatusChoices, WorkflowType
from backoffice.workflows.documents import WorkflowDocument
from backoffice.workflows.models import Workflow, WorkflowTicket
from backoffice.workflows.models import Decision, Workflow, WorkflowTicket


class WorkflowTicketSerializer(serializers.ModelSerializer):
ticket_url = serializers.SerializerMethodField()
workflow_id = serializers.PrimaryKeyRelatedField(queryset=Workflow.objects.all())

class Meta:
model = WorkflowTicket
Expand All @@ -23,8 +24,17 @@ def get_ticket_url(self, obj):
)


class DecisionSerializer(serializers.ModelSerializer):
workflow = serializers.PrimaryKeyRelatedField(queryset=Workflow.objects.all())

class Meta:
model = Decision
fields = "__all__"


class WorkflowSerializer(serializers.ModelSerializer):
tickets = WorkflowTicketSerializer(many=True, read_only=True)
decisions = DecisionSerializer(many=True, read_only=True)

class Meta:
model = Workflow
Expand Down
11 changes: 11 additions & 0 deletions backoffice/backoffice/workflows/api/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from backoffice.workflows.api.serializers import DecisionSerializer


def add_decision(workflow_id, user, action):
serializer_class = DecisionSerializer
data = {"workflow": workflow_id, "user": user, "action": action}

serializer = serializer_class(data=data)
if serializer.is_valid(raise_exception=True):
serializer.save()
return serializer.data
60 changes: 34 additions & 26 deletions backoffice/backoffice/workflows/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from backoffice.utils.pagination import OSStandardResultsSetPagination
from backoffice.workflows import airflow_utils
from backoffice.workflows.api import utils
from backoffice.workflows.api.serializers import (
AuthorResolutionSerializer,
WorkflowAuthorSerializer,
Expand All @@ -37,7 +38,7 @@
WorkflowType,
)
from backoffice.workflows.documents import WorkflowDocument
from backoffice.workflows.models import Workflow, WorkflowTicket
from backoffice.workflows.models import Decision, Workflow, WorkflowTicket

logger = logging.getLogger(__name__)

Expand All @@ -52,8 +53,14 @@ def get_queryset(self):
return self.queryset.filter(status__status=status)
return self.queryset

def perform_destroy(self, instance):
airflow_utils.delete_workflow_dag_runs(instance.id, instance.workflow_type)
super().perform_destroy(instance)


class WorkflowTicketViewSet(viewsets.ViewSet):
serializer_class = WorkflowTicketSerializer

def retrieve(self, request, *args, **kwargs):
workflow_id = kwargs.get("pk")
ticket_type = request.query_params.get("ticket_type")
Expand All @@ -68,7 +75,7 @@ def retrieve(self, request, *args, **kwargs):
workflow_ticket = WorkflowTicket.objects.get(
workflow_id=workflow_id, ticket_type=ticket_type
)
serializer = WorkflowTicketSerializer(workflow_ticket)
serializer = self.serializer_class(workflow_ticket)
return Response(serializer.data)
except WorkflowTicket.DoesNotExist:
return Response(
Expand All @@ -77,27 +84,21 @@ def retrieve(self, request, *args, **kwargs):
)

def create(self, request, *args, **kwargs):
workflow_id = request.data.get("workflow_id")
ticket_type = request.data.get("ticket_type")
ticket_id = request.data.get("ticket_id")

if not all([workflow_id, ticket_type, ticket_id]):
return Response(
{"error": "Workflow_id, ticket_id and ticket_type are required."},
status=status.HTTP_400_BAD_REQUEST,
)
serializer = self.serializer_class(data=request.data)

try:
workflow = Workflow.objects.get(id=workflow_id)
workflow_ticket = WorkflowTicket.objects.create(
workflow_id=workflow, ticket_id=ticket_id, ticket_type=ticket_type
)
serializer = WorkflowTicketSerializer(workflow_ticket)
if serializer.is_valid(raise_exception=True):
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
except Exception as e:
return Response(
{"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR
)


class DecisionViewSet(viewsets.ModelViewSet):
queryset = Decision.objects.all()

def create(self, request, *args, **kwargs):
data = utils.add_decision(
request.data["workflow_id"], request.user, request.data["action"]
)
return Response(data, status=status.HTTP_201_CREATED)


class AuthorWorkflowViewSet(viewsets.ViewSet):
Expand All @@ -121,11 +122,12 @@ def create(self, request):
WORKFLOW_DAGS[workflow.workflow_type].initialize,
workflow.id,
)
return airflow_utils.trigger_airflow_dag(
airflow_utils.trigger_airflow_dag(
WORKFLOW_DAGS[workflow.workflow_type].initialize,
str(workflow.id),
workflow.data,
)
return Response(serializer.data, status=status.HTTP_201_CREATED)

@extend_schema(
summary="Partially Updates Author",
Expand Down Expand Up @@ -160,16 +162,22 @@ def resolve(self, request, pk=None):
logger.info("Resolving data: %s", request.data)
serializer = AuthorResolutionSerializer(data=request.data)
if serializer.is_valid(raise_exception=True):
extra_data = {"create_ticket": serializer.validated_data["create_ticket"]}
extra_data = serializer.validated_data
logger.info(
"Trigger Airflow DAG: %s for %s",
ResolutionDags[serializer.validated_data["value"]],
pk,
)
utils.add_decision(pk, request.user, serializer.validated_data["value"])

return airflow_utils.trigger_airflow_dag(
airflow_utils.trigger_airflow_dag(
ResolutionDags[serializer.validated_data["value"]].label, pk, extra_data
)
workflow_serializer = self.serializer_class(
get_object_or_404(Workflow, pk=pk)
)

return Response(workflow_serializer.data)

@extend_schema(
summary="Restart an Author Workflow",
Expand Down Expand Up @@ -270,9 +278,9 @@ def __init__(self, *args, **kwargs):
"is_update": "is_update",
}

ordering_fields = {"_updated_at": "_updated_at"}
ordering_fields = {"_updated_at": "_updated_at", "_score": "_score"}

ordering = ("-_updated_at",)
ordering = ("-_updated_at", "-_score")

faceted_search_fields = {
"status": {
Expand Down
4 changes: 4 additions & 0 deletions backoffice/backoffice/workflows/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ class WorkflowType(models.TextChoices):
class ResolutionDags(models.TextChoices):
accept = "accept", "author_create_approved_dag"
reject = "reject", "author_create_rejected_dag"
accept_curate = "accept_curate", "author_create_approved_dag"


DECISION_CHOICES = ResolutionDags.choices


class AuthorCreateDags(models.TextChoices):
Expand Down
58 changes: 58 additions & 0 deletions backoffice/backoffice/workflows/migrations/0009_decision.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Generated by Django 4.2.6 on 2024-08-15 12:25

import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
("workflows", "0008_alter_workflow_status_alter_workflow_workflow_type"),
]

operations = [
migrations.CreateModel(
name="Decision",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
(
"action",
models.CharField(
choices=[
("accept", "author_create_approved_dag"),
("reject", "author_create_rejected_dag"),
("accept_curate", "author_create_approved_dag"),
],
max_length=30,
),
),
("_created_at", models.DateTimeField(auto_now_add=True)),
("_updated_at", models.DateTimeField(auto_now=True)),
(
"user",
models.ForeignKey(
db_column="email",
on_delete=django.db.models.deletion.CASCADE,
to=settings.AUTH_USER_MODEL,
to_field="email",
),
),
(
"workflow",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to="workflows.workflow",
),
),
],
),
]
Loading

0 comments on commit 739e9fd

Please sign in to comment.