Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
author submissions: initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
DonHaul committed Jul 10, 2024
1 parent 9ab92f1 commit 7e82ec3
Show file tree
Hide file tree
Showing 15 changed files with 241 additions and 91 deletions.
4 changes: 4 additions & 0 deletions .envs/docker/.django
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ CELERY_FLOWER_PASSWORD=debug
# OpenSearch
OPENSEARCH_HOST=opensearch:9200
OPENSEARCH_INDEX_PREFIX=backoffice-backend-local

# Airflow
AIRFLOW_BASE_URL=http://localhost:8080
AIRFLOW_TOKEN=CHANGE_ME
4 changes: 4 additions & 0 deletions .envs/local/.django
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ CELERY_FLOWER_PASSWORD=debug
# Opensearch
OPENSEARCH_HOST=opensearch:9200
OPENSEARCH_INDEX_PREFIX=backoffice-backend-local

# Airflow
AIRFLOW_BASE_URL=http://host.docker.internal:8082
AIRFLOW_TOKEN=CHANGE_ME
4 changes: 4 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ repos:
rev: "v2.7.1"
hooks:
- id: prettier
- repo: https://github.com/pycqa/isort
rev: "5.12.0"
hooks:
- id: isort
- repo: https://github.com/pycqa/flake8
rev: "3.9.2"
hooks:
Expand Down
39 changes: 17 additions & 22 deletions backoffice/workflows/airflow_utils.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,34 @@
import requests
from os import environ

import requests
from django.http import JsonResponse
from requests.exceptions import HTTPError, RequestException
from requests.exceptions import RequestException
from rest_framework import status

AIRFLOW_BASE_URL = environ.get("AIRFLOW_BASE_URL")

AIRFLOW_HEADERS = {"Content-Type": "application/json", "Authorization": f"Basic {environ.get('AIRFLOW_TOKEN')}"}

AIRFLOW_BASE_URL = environ.get('AIRFLOW_BASE_URL')

AIRFLOW_HEADERS = {
"Content-Type": "application/json",
"Authorization": f"Basic {environ.get('AIRFLOW_TOKEN')}"
}
def trigger_airflow_dag(dag_id, workflow_id, extra_data=None):
"""Triggers an airflow dag.
def trigger_airflow_dag(dag_id,workflow_id, extra_data = None):
""" triggers an airflow dag
:param dag_id: name of the dag to run
:param workflow_id: id of the workflow being triggered
:return request response"""
:returns: request response
"""

data = {
"dag_run_id": workflow_id,
"conf":
{
"workflow_id": workflow_id
}
}
data = {"dag_run_id": workflow_id, "conf": {"workflow_id": workflow_id}}

if extra_data is not None:
data["conf"].update(extra_data)

url = f'{AIRFLOW_BASE_URL}/api/v1/dags/{dag_id}/dagRuns'
url = f"{AIRFLOW_BASE_URL}/api/v1/dags/{dag_id}/dagRuns"

try:
response = requests.post(url, json=data, headers=AIRFLOW_HEADERS, timeout=300)
response = requests.post(url, json=data, headers=AIRFLOW_HEADERS)
response.raise_for_status()
return JsonResponse(response.json())
except HTTPError as http_err:
return JsonResponse({'error': f'HTTP error occurred: {http_err}'}, status=response.status_code)
except RequestException as req_err:
return JsonResponse({'error': f'Request error occurred: {req_err}'}, status=500)
data = {"error": req_err}
return JsonResponse(data, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
7 changes: 7 additions & 0 deletions backoffice/workflows/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from backoffice.workflows.documents import WorkflowDocument
from backoffice.workflows.models import Workflow, WorkflowTicket

from ..constants import ResolutionDags


class WorkflowSerializer(serializers.ModelSerializer):
class Meta:
Expand All @@ -21,3 +23,8 @@ class WorkflowDocumentSerializer(DocumentSerializer):
class Meta:
document = WorkflowDocument
fields = "__all__"


class AuthorResolutionSerializer(serializers.Serializer):
value = serializers.ChoiceField(choices=ResolutionDags)
create_ticket = serializers.BooleanField(default=False)
60 changes: 24 additions & 36 deletions backoffice/workflows/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@
from backoffice.workflows.documents import WorkflowDocument
from backoffice.workflows.models import Workflow, WorkflowTicket

from .serializers import WorkflowDocumentSerializer, WorkflowSerializer, WorkflowTicketSerializer
from ..constants import WORKFLOW_DAG, ResolutionDags
from .serializers import (
AuthorResolutionSerializer,
WorkflowDocumentSerializer,
WorkflowSerializer,
WorkflowTicketSerializer,
)


class WorkflowViewSet(viewsets.ModelViewSet):
Expand Down Expand Up @@ -72,43 +78,25 @@ def create(self, request, *args, **kwargs):
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)


class WorflowSubmissionViewSet(viewsets.ViewSet):
@action(detail=False, methods=["post"])
def submit(self, request):

# TODO workflow submission serializer

# create workflow entry
workflow = Workflow.objects.create(
data=request.data, status="approval", core=True, is_update=False, workflow_type="AUTHOR_CREATE"
)

print("Triggering dag")
# response id, corresponds to the new workflow id
response = airflow_utils.trigger_airflow_dag("author_create_initialization_dag", str(workflow.id))

return Response({"data": response.content, "status_code": response.status_code}, status=status.HTTP_200_OK)

@action(detail=False, methods=["post"])
def resolve(self, request):

data = request.data
create_ticket = data["create_ticket"]
resolution = data["resolution"]
extra_data = {"create_ticket": create_ticket, "resolution": resolution}
class AuthorWorkflowViewSet(viewsets.ViewSet):
serializer_class = WorkflowSerializer

if resolution == "accept":
dag_name = "author_create_approved_dag"
elif resolution == "reject":
dag_name = "author_create_rejected_dag"
else:
return Response(
{"message": "resolution method unrecognized"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR
def create(self, request):
serializer = self.serializer_class(data=request.data)
if serializer.is_valid(raise_exception=True):
workflow = Workflow.objects.create(
data=serializer.validated_data["data"], workflow_type=serializer.validated_data["workflow_type"]
)
return airflow_utils.trigger_airflow_dag(WORKFLOW_DAG[workflow.workflow_type], str(workflow.id), workflow.data)

@action(detail=True, methods=["post"])
def resolve(self, request, pk=None):
serializer = AuthorResolutionSerializer(data=request.data)
if serializer.is_valid(raise_exception=True):
extra_data = {"create_ticket": serializer.validated_data["create_ticket"]}
return airflow_utils.trigger_airflow_dag(
ResolutionDags[serializer.validated_data["value"]].label, pk, extra_data
)

response = airflow_utils.trigger_airflow_dag(dag_name, data["id"], extra_data)

return Response({"data": response.content, "status_code": response.status_code}, status=status.HTTP_200_OK)


class WorkflowDocumentView(BaseDocumentViewSet):
Expand Down
48 changes: 34 additions & 14 deletions backoffice/workflows/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from django.db import models

# tickets
TICKET_TYPES = (
("author_create_curation", "Author create curation"),
Expand All @@ -6,18 +8,36 @@
DEFAULT_TICKET_TYPE = "author_create_curation"

# workflows
DEFAULT_STATUS_CHOICE = "running"
DEFAULT_WORKFLOW_TYPE = "HEP_create"
STATUS_CHOICES = (
("running", "Running"),
("approval", "Waiting for approval"),
("completed", "Completed"),
("error", "Error"),
)

WORKFLOW_TYPES = (
("HEP_CREATE", "HEP create"),
("HEP_UPDATE", "HEP update"),
("AUTHOR_CREATE", "Author create"),
("AUTHOR_UPDATE", "Author update"),
)

class StatusChoices(models.TextChoices):
RUNNING = "running", "Running"
APPROVAL = "approval", "Waiting for approva"
COMPLETED = "completed", "Completed"
ERROR = "error", "Error"


DEFAULT_STATUS_CHOICE = StatusChoices.RUNNING


class WorkflowType(models.TextChoices):
HEP_CREATE = "HEP_CREATE", "HEP create"
HEP_UPDATE = "HEP_UPDATE", "HEP update"
AUTHOR_CREATE = "AUTHOR_CREATE", "Author create"
AUTHOR_UPDATE = "AUTHOR_UPDATE", "Author update"


DEFAULT_WORKFLOW_TYPE = WorkflowType.HEP_CREATE

# author dags for each workflow type
WORKFLOW_DAG = {
WorkflowType.HEP_CREATE: "",
WorkflowType.HEP_UPDATE: "",
WorkflowType.AUTHOR_CREATE: "author_create_initialization_dag",
WorkflowType.AUTHOR_UPDATE: "author_update_dag",
}


class ResolutionDags(models.TextChoices):
accept = "accept", "author_create_approved_dag"
reject = "reject", "author_create_rejected_dag"
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Generated by Django 4.2.6 on 2024-06-20 09:07

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("workflows", "0006_workflow__created_at_workflow__updated_at"),
]

operations = [
migrations.AlterField(
model_name="workflow",
name="core",
field=models.BooleanField(default=False),
),
migrations.AlterField(
model_name="workflow",
name="is_update",
field=models.BooleanField(default=False),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Generated by Django 4.2.6 on 2024-07-09 12:42

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("workflows", "0007_alter_workflow_core_alter_workflow_is_update"),
]

operations = [
migrations.AlterField(
model_name="workflow",
name="status",
field=models.CharField(
choices=[
("running", "Running"),
("approval", "Waiting for approva"),
("completed", "Completed"),
("error", "Error"),
],
default="running",
max_length=30,
),
),
migrations.AlterField(
model_name="workflow",
name="workflow_type",
field=models.CharField(
choices=[
("HEP_CREATE", "HEP create"),
("HEP_UPDATE", "HEP update"),
("AUTHOR_CREATE", "Author create"),
("AUTHOR_UPDATE", "Author update"),
],
default="HEP_CREATE",
max_length=30,
),
),
]
12 changes: 6 additions & 6 deletions backoffice/workflows/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
DEFAULT_STATUS_CHOICE,
DEFAULT_TICKET_TYPE,
DEFAULT_WORKFLOW_TYPE,
STATUS_CHOICES,
TICKET_TYPES,
WORKFLOW_TYPES,
StatusChoices,
WorkflowType,
)


Expand All @@ -17,17 +17,17 @@ class Workflow(models.Model):

workflow_type = models.CharField(
max_length=30,
choices=WORKFLOW_TYPES,
choices=WorkflowType.choices,
default=DEFAULT_WORKFLOW_TYPE,
)
data = models.JSONField()
status = models.CharField(
max_length=30,
choices=STATUS_CHOICES,
choices=StatusChoices.choices,
default=DEFAULT_STATUS_CHOICE,
)
core = models.BooleanField()
is_update = models.BooleanField()
core = models.BooleanField(default=False)
is_update = models.BooleanField(default=False)

_created_at = models.DateTimeField(auto_now_add=True)
_updated_at = models.DateTimeField(auto_now=True)
Expand Down
Loading

0 comments on commit 7e82ec3

Please sign in to comment.