Skip to content

Commit

Permalink
airflow: fixing snowticket workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
DonHaul committed Nov 20, 2024
1 parent 681c501 commit 3774928
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 137 deletions.
11 changes: 9 additions & 2 deletions backoffice/backoffice/authors/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@
from django.http import JsonResponse
from requests.exceptions import RequestException
from rest_framework import status
import json
from django.core.serializers.json import DjangoJSONEncoder

from backoffice.authors.constants import WORKFLOW_DAGS

AIRFLOW_BASE_URL = environ.get("AIRFLOW_BASE_URL")

AIRFLOW_HEADERS = {
"Authorization": f"Basic {environ.get('AIRFLOW_TOKEN')}",
"Content-Type": "application/json",
}

logger = logging.getLogger(__name__)


def trigger_airflow_dag(dag_id, workflow_id, extra_data=None):
def trigger_airflow_dag(dag_id, workflow_id, extra_data=None, workflow=None):
"""Triggers an airflow dag.
:param dag_id: name of the dag to run
Expand All @@ -29,6 +32,8 @@ def trigger_airflow_dag(dag_id, workflow_id, extra_data=None):

if extra_data is not None:
data["conf"]["data"] = extra_data
if workflow is not None:
data["conf"]["workflow"] = workflow

url = f"{AIRFLOW_BASE_URL}/api/v1/dags/{dag_id}/dagRuns"

Expand All @@ -39,7 +44,9 @@ def trigger_airflow_dag(dag_id, workflow_id, extra_data=None):
data,
url,
)
response = requests.post(url, json=data, headers=AIRFLOW_HEADERS)
response = requests.post(
url, data=json.dumps(data, cls=DjangoJSONEncoder), headers=AIRFLOW_HEADERS
)
response.raise_for_status()
return JsonResponse(response.json())
except RequestException:
Expand Down
8 changes: 5 additions & 3 deletions backoffice/backoffice/authors/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def create(self, request):
airflow_utils.trigger_airflow_dag(
WORKFLOW_DAGS[workflow.workflow_type].initialize,
str(workflow.id),
workflow.data,
workflow=serializer.data,
)
return Response(serializer.data, status=status.HTTP_201_CREATED)

Expand Down Expand Up @@ -177,18 +177,20 @@ def resolve(self, request, pk=None):
logger.info("Resolving data: %s", request.data)
serializer = self.resolution_serializer(data=request.data)
if serializer.is_valid(raise_exception=True):
extra_data = serializer.validated_data
logger.info(
"Trigger Airflow DAG: %s for %s",
AuthorResolutionDags[serializer.validated_data["value"]],
pk,
)
utils.add_decision(pk, request.user, serializer.validated_data["value"])

workflow = self.get_serializer(AuthorWorkflow.objects.get(pk=pk)).data

airflow_utils.trigger_airflow_dag(
AuthorResolutionDags[serializer.validated_data["value"]].label,
pk,
extra_data,
serializer.data,
workflow=workflow,
)
workflow_serializer = self.serializer_class(
get_object_or_404(AuthorWorkflow, pk=pk)
Expand Down
111 changes: 60 additions & 51 deletions workflows/dags/author/author_create/author_create_approved.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import datetime
import logging
from airflow.utils.trigger_rule import TriggerRule
from include.utils.tickets import get_ticket_by_type

from airflow.decorators import dag, task
from airflow.models.param import Param
from author.author_create.shared_tasks import close_author_create_user_ticket
from hooks.backoffice.workflow_management_hook import AUTHORS, WorkflowManagementHook
from hooks.backoffice.workflow_ticket_management_hook import (
AuthorWorkflowTicketManagementHook,
Expand All @@ -30,6 +31,7 @@
schedule=None,
catchup=False,
on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date!
tags=["authors"]
)
def author_create_approved_dag():
"""Defines the DAG for the author creation workflow after curator's approval.
Expand All @@ -48,43 +50,47 @@ def author_create_approved_dag():
"""
inspire_http_hook = InspireHttpHook()
inspire_http_record_management_hook = InspireHTTPRecordManagementHook()
workflow_management_hook = WorkflowManagementHook()
workflow_management_hook = WorkflowManagementHook(AUTHORS)
workflow_ticket_management_hook = AuthorWorkflowTicketManagementHook()

@task()
def set_workflow_status_to_running(**context):
status_name = "running"
workflow_management_hook.set_workflow_status(
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
workflow_id=context["params"]["workflow_id"]
)

@task.branch()
def author_check_approval_branch(**context: dict) -> None:
"""Branching for the workflow: based on create_ticket parameter
"""Branching for the workflow: based on value parameter
dag goes either to create_ticket_on_author_approval task or
directly to create_author_on_inspire
"""
if context["params"]["create_ticket"]:
if "accept_curate" == context["params"]["data"]["value"]:
return "create_author_create_curation_ticket"
else:
return "empty_task"
return "close_author_create_user_ticket"

@task
def create_author_create_curation_ticket(**context: dict) -> None:
endpoint = "api/tickets/create"
request_data = {
"functional_category": "",
"workflow_id": context["params"]["workflow_id"],
"subject": "test", # TODO: update subject and description
"description": "test",
"caller_email": "", # leave empty
"template": "curation_needed_author", # TODO: check template
}
response = inspire_http_hook.call_api(
endpoint=endpoint, data=request_data, method="POST"
)

workflow_data = context["params"]["workflow"]["data"]
email = workflow_data["acquisition_source"]["email"]

bai = "[{}]".format(workflow_data.get("bai")) if workflow_data.get("bai") else ""
control_number = context["ti"].xcom_pull(task_ids="create_author_on_inspire", key="control_number")
response = inspire_http_hook.create_ticket(
"Author curation",
"curation_needed_author",
f"Curation needed for author {workflow_data.get('name').get('preferred_name')} {bai}",
email,
{"email": email, "record_url": f"{inspire_http_hook.base_url}/authors/{control_number}"})





workflow_ticket_management_hook.create_ticket_entry(
workflow_id=context["params"]["workflow_id"],
ticket_id=response.json()["ticket_id"],
Expand All @@ -102,13 +108,14 @@ def create_author_on_inspire(**context: dict) -> str:
status = get_wf_status_from_inspire_response(response)
if response.ok:
control_number = response.json()["metadata"]["control_number"]
context["ti"].xcom_push(key="control_number", value=control_number)
logger.info(f"Created author with control number: {control_number}")
workflow_data["data"]["control_number"] = control_number
workflow_management_hook.partial_update_workflow(
workflow_id=context["params"]["workflow_id"],
workflow_partial_update_data={"data": workflow_data["data"]},
collection=AUTHORS,
workflow_partial_update_data={"data": workflow_data["data"]}
)
logger.info(f"Workflow status: {status}")
return status

@task.branch()
Expand All @@ -125,14 +132,25 @@ def set_author_create_workflow_status_to_completed(**context: dict) -> None:
status_name = "completed"
workflow_management_hook.set_workflow_status(
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
workflow_id=context["params"]["workflow_id"]
)

@task
def empty_task() -> None:
# Logic to combine the results of branches
pass
@task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
def close_author_create_user_ticket(**context: dict) -> None:
ticket_id=get_ticket_by_type(context["params"]["workflow"],"author_create_user")["ticket_id"]

workflow_data = context["params"]["workflow"]["data"]
email = workflow_data["acquisition_source"]["email"]
control_number = context["ti"].xcom_pull(task_ids='create_author_on_inspire', key='control_number')

inspire_http_hook.get_backoffice_url(context["params"]["workflow_id"])

request_data = {
"user_name":workflow_data["acquisition_source"].get('given_names',email),
"author_name": workflow_data.get("name").get("preferred_name"),
"record_url": f"{inspire_http_hook.base_url}/authors/{control_number}"
}
inspire_http_hook.close_ticket(ticket_id,"user_accepted_author", request_data)

@task()
def set_author_create_workflow_status_to_error(**context: dict) -> None:
Expand All @@ -142,7 +160,6 @@ def set_author_create_workflow_status_to_error(**context: dict) -> None:
workflow_management_hook.set_workflow_status(
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
)

# task definitions
Expand All @@ -153,36 +170,28 @@ def set_author_create_workflow_status_to_error(**context: dict) -> None:
close_author_create_user_ticket_task = close_author_create_user_ticket()
create_author_create_curation_ticket_task = create_author_create_curation_ticket()
set_workflow_status_to_completed_task = (
set_author_create_workflow_status_to_completed()
set_author_create_workflow_status_to_completed()
)
set_workflow_status_to_error_task = set_author_create_workflow_status_to_error()
combine_ticket_and_no_ticket_task = empty_task()

# task dependencies
ticket_branch = create_author_create_curation_ticket_task
(
ticket_branch
>> close_author_create_user_ticket_task
>> set_workflow_status_to_completed_task
)


no_ticket_branch = combine_ticket_and_no_ticket_task
(
no_ticket_branch
>> close_author_create_user_ticket_task
>> set_workflow_status_to_completed_task
)

author_check_approval_branch_task >> [ticket_branch, no_ticket_branch]
# task dependencies
(
set_status_to_running_task
>> create_author_on_inspire_task
>> author_create_success_branch_task
set_status_to_running_task
>> create_author_on_inspire_task
>> author_create_success_branch_task
)
author_create_success_branch_task >> [
author_check_approval_branch_task,
set_workflow_status_to_error_task,
author_check_approval_branch_task,
set_workflow_status_to_error_task,
]

(
[author_check_approval_branch_task >> create_author_create_curation_ticket_task,
author_check_approval_branch_task,
]
>> close_author_create_user_ticket_task
>> set_workflow_status_to_completed_task
)

author_create_approved_dag()
60 changes: 35 additions & 25 deletions workflows/dags/author/author_create/author_create_init.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import datetime
import logging

from airflow.hooks.base import BaseHook
from airflow.decorators import dag, task
from airflow.models import Variable
from airflow.models.param import Param
from hooks.backoffice.workflow_management_hook import AUTHORS, WorkflowManagementHook
from hooks.backoffice.workflow_ticket_management_hook import (
AuthorWorkflowTicketManagementHook,
)
from hooks.inspirehep.inspire_http_hook import InspireHttpHook
from hooks.inspirehep.inspire_http_hook import InspireHttpHook, AUTHOR_SUBMIT_FUNCTIONAL_CATEGORY
from include.utils.set_workflow_status import set_workflow_status_to_error

logger = logging.getLogger(__name__)
Expand All @@ -24,6 +24,7 @@
catchup=False,
# TODO: what if callback fails? Data in backoffice not up to date!
on_failure_callback=set_workflow_status_to_error,
tags=["authors"]
)
def author_create_initialization_dag():
"""
Expand All @@ -37,16 +38,15 @@ def author_create_initialization_dag():
"""
inspire_http_hook = InspireHttpHook()
workflow_management_hook = WorkflowManagementHook()
workflow_management_hook = WorkflowManagementHook(AUTHORS)
workflow_ticket_management_hook = AuthorWorkflowTicketManagementHook()

@task()
def set_workflow_status_to_running(**context):
status_name = "running"
workflow_management_hook.set_workflow_status(
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
workflow_id=context["params"]["workflow_id"]
)

@task()
Expand All @@ -55,40 +55,50 @@ def set_schema(**context):
workflow_management_hook.partial_update_workflow(
workflow_id=context["params"]["workflow_id"],
workflow_partial_update_data={
"data": {**context["params"]["data"], "$schema": schema}
},
collection=AUTHORS,
"data": {**context["params"]["workflow"]["data"], "$schema": schema}
}
)

@task()
def create_author_create_user_ticket(**context: dict) -> None:
endpoint = "/api/tickets/create"
request_data = {
"functional_category": "Author curation",
"template": "user_new_author",
"workflow_id": context["params"]["workflow_id"],
"subject": "test", # TODO: set the subject and description
"description": "test",
"caller_email": "", # leave empty
}
response = inspire_http_hook.call_api(
endpoint=endpoint, data=request_data, method="POST"
)
logger.info(f"Ticket created. Response status code: {response.status_code}")
logger.info(response.json())

workflow_data = context["params"]["workflow"]["data"]
email = workflow_data["acquisition_source"]["email"]

response = inspire_http_hook.create_ticket(
AUTHOR_SUBMIT_FUNCTIONAL_CATEGORY,
"curator_new_author",
f"Your suggestion to INSPIRE: author {workflow_data.get('name').get('preferred_name')}",
workflow_data["acquisition_source"]["email"],
{
"email": email,
"obj_url":inspire_http_hook.get_backoffice_url(context["params"]["workflow_id"])
})

ticket_id = response.json()["ticket_id"]

response = inspire_http_hook.reply_ticket(
ticket_id,
"user_new_author",
{
"user_name":workflow_data["acquisition_source"].get('given_names',email),
"author_name": workflow_data.get("name").get("preferred_name")
},
email)


workflow_ticket_management_hook.create_ticket_entry(
workflow_id=context["params"]["workflow_id"],
ticket_type="author_create_user",
ticket_id=response.json()["ticket_id"],
ticket_id=ticket_id,
)

@task()
def set_author_create_workflow_status_to_approval(**context: dict) -> None:
status_name = "approval"
workflow_management_hook.set_workflow_status(
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
workflow_id=context["params"]["workflow_id"]
)

# task dependencies
Expand Down
Loading

0 comments on commit 3774928

Please sign in to comment.