Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
decisions: added to endpoint to resolve
Browse files Browse the repository at this point in the history
  • Loading branch information
DonHaul committed Aug 20, 2024
1 parent e083725 commit 1ab97e1
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 37 deletions.
31 changes: 14 additions & 17 deletions backoffice/backoffice/workflows/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@
logger = logging.getLogger(__name__)


def add_decision(workflow_id, user, action):
serializer_class = DecisionSerializer
data = {"workflow": workflow_id, "user": user, "action": action}

serializer = serializer_class(data=data)
if serializer.is_valid(raise_exception=True):
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)


class WorkflowViewSet(viewsets.ModelViewSet):
queryset = Workflow.objects.all()
serializer_class = WorkflowSerializer
Expand Down Expand Up @@ -103,25 +113,11 @@ def create(self, request, *args, **kwargs):

class DecisionViewSet(viewsets.ModelViewSet):
queryset = Decision.objects.all()
serializer_class = DecisionSerializer

def get_queryset(self):
status = self.request.query_params.get("status")
if status:
return self.queryset.filter(status__status=status)
return self.queryset

def create(self, request, *args, **kwargs):
data = {
"workflow": request.data["workflow_id"],
"user": request.user,
"action": request.data["action"],
}

serializer = self.serializer_class(data=data)
if serializer.is_valid(raise_exception=True):
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
return add_decision(
request.data["workflow_id"], request.user, request.data["action"]
)


class AuthorWorkflowViewSet(viewsets.ViewSet):
Expand Down Expand Up @@ -190,6 +186,7 @@ def resolve(self, request, pk=None):
ResolutionDags[serializer.validated_data["value"]],
pk,
)
add_decision(pk, request.user, serializer.validated_data["value"])

return airflow_utils.trigger_airflow_dag(
ResolutionDags[serializer.validated_data["value"]].label, pk, extra_data
Expand Down
10 changes: 10 additions & 0 deletions workflows/Dockerfile copy
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM apache/airflow:2.9.3-python3.11

WORKDIR /opt/airflow

COPY --chown=airflow:root dags ./dags/
COPY --chown=airflow:root plugins ./plugins/
COPY --chown=airflow:root requirements.txt ./requirements.txt
COPY --chown=airflow:root requirements-test.txt ./requirements-test.txt

RUN pip install --no-cache-dir -r requirements.txt -r requirements-test.txt
File renamed without changes.
72 changes: 72 additions & 0 deletions workflows/dags/author/author_create/process_until_breakpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import datetime
import json

from airflow.decorators import dag, task
from airflow.operators.python import ShortCircuitOperator
from airflow.utils.trigger_rule import TriggerRule


@dag(
start_date=datetime.datetime(2021, 1, 1),
schedule=None,
params={"approved": True},
)
def process_untill_breakpoint():
def check_approval(**context):
return not context["params"]["approved"]

@task
def fetch_document(filename: str) -> dict:
from include.utils.s3_client import get_s3_client

s3_client = get_s3_client()
s3_client.download_file("inspire-incoming", filename, f"./{filename}")
with open(f"./{filename}") as f:
data = json.load(f)
return data

@task()
def normalize_affiliations(data):
from hooks.inspire_connection_hook import call_inspire_api_with_hook
from include.inspire.affiliations_normalization import (
assign_normalized_affiliations,
)

endpoint = "/curation/literature/affiliations-normalization"
request_data = {"authors": data["authors"], "workflow_id": 1}
result = call_inspire_api_with_hook(endpoint=endpoint, data=request_data)
data = assign_normalized_affiliations(result.json(), data=data)
return data

def auto_approval(**kwargs):
from include.inspire.approval import auto_approve

data = kwargs["task_instance"].xcom_pull(task_ids="normalize_affiliations")
return bool(auto_approve(data))

@task(trigger_rule=TriggerRule.NONE_FAILED)
def validate():
return

check_approval = ShortCircuitOperator(
task_id="check_approval",
ignore_downstream_trigger_rules=False,
python_callable=check_approval,
)
fetch_document_task = fetch_document("test.json")
normalize_affiliations_task = normalize_affiliations(fetch_document_task)
auto_approval = ShortCircuitOperator(
task_id="auto_approval", python_callable=auto_approval
)
validation = validate()

(
check_approval
>> fetch_document_task
>> normalize_affiliations_task
>> auto_approval
>> validation
)


process_untill_breakpoint()
14 changes: 14 additions & 0 deletions workflows/dags/author/author_create/shared_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from airflow.decorators import task
from hooks.backoffice.base import BackofficeHook


@task()
def create_decision_on_curation_choice(**context):
print("wow")
print(context)
data = {
"action": context["params"]["data"]["value"],
"workflow_id": context["params"]["workflow_id"],
}

return BackofficeHook().request(method="POST", data=data, endpoint="api/decisions/")
38 changes: 38 additions & 0 deletions workflows/plugins/hooks/backoffice/decision_management_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from hooks.backoffice.base import BackofficeHook
from requests import Response


class WorkflowTicketManagementHook(BackofficeHook):
"""
A hook to update the status of a workflow in the backoffice system.
:param method: The HTTP method to use for the request (default: "GET").
:type method: str
:param http_conn_id: The ID of the HTTP connection to use (
default: "backoffice_conn").
:type http_conn_id: str
"""

def __init__(
self,
method: str = "GET",
http_conn_id: str = "backoffice_conn",
headers: dict = None,
) -> None:
super().__init__(method, http_conn_id, headers)
self.endpoint = "api/decision/"

def create_decision_entry(
self, workflow_id: str, user_id: str, action: str
) -> Response:
data = {
"user_id": user_id,
"action": action,
"workflow_id": workflow_id,
}
return self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs,
method="POST",
data=data,
endpoint=self.endpoint,
)
20 changes: 0 additions & 20 deletions workflows/tests/test_author_create_tasks.py

This file was deleted.

0 comments on commit 1ab97e1

Please sign in to comment.