Skip to content

Commit

Permalink
airflow: fixing snowticket workflow
Browse files Browse the repository at this point in the history
* ref: cern-sis/issues-inspire/issues/614

rough draft

	deleted:    workflows/dags/author/author_create/shared_tasks.py
  • Loading branch information
DonHaul committed Nov 25, 2024
1 parent a323f49 commit 31f1e78
Show file tree
Hide file tree
Showing 16 changed files with 509 additions and 208 deletions.
18 changes: 12 additions & 6 deletions backoffice/backoffice/authors/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@
from django.http import JsonResponse
from requests.exceptions import RequestException
from rest_framework import status
import json
from django.core.serializers.json import DjangoJSONEncoder

from backoffice.authors.constants import WORKFLOW_DAGS

AIRFLOW_BASE_URL = environ.get("AIRFLOW_BASE_URL")

AIRFLOW_HEADERS = {
"Authorization": f"Basic {environ.get('AIRFLOW_TOKEN')}",
}
AIRFLOW_HEADERS = {"Authorization": f"Basic {environ.get('AIRFLOW_TOKEN')}"}

logger = logging.getLogger(__name__)


def trigger_airflow_dag(dag_id, workflow_id, extra_data=None):
def trigger_airflow_dag(dag_id, workflow_id, extra_data=None, workflow=None):
"""Triggers an airflow dag.
:param dag_id: name of the dag to run
Expand All @@ -27,8 +27,10 @@ def trigger_airflow_dag(dag_id, workflow_id, extra_data=None):

data = {"dag_run_id": str(workflow_id), "conf": {"workflow_id": str(workflow_id)}}

if extra_data is not None:
if extra_data:
data["conf"]["data"] = extra_data
if workflow:
data["conf"]["workflow"] = workflow

url = f"{AIRFLOW_BASE_URL}/api/v1/dags/{dag_id}/dagRuns"

Expand All @@ -39,7 +41,11 @@ def trigger_airflow_dag(dag_id, workflow_id, extra_data=None):
data,
url,
)
response = requests.post(url, json=data, headers=AIRFLOW_HEADERS)
response = requests.post(
url,
data=json.dumps(data, cls=DjangoJSONEncoder),
headers=AIRFLOW_HEADERS | {"Content-Type": "application/json"},
)
response.raise_for_status()
return JsonResponse(response.json())
except RequestException:
Expand Down
8 changes: 5 additions & 3 deletions backoffice/backoffice/authors/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def create(self, request):
airflow_utils.trigger_airflow_dag(
WORKFLOW_DAGS[workflow.workflow_type].initialize,
str(workflow.id),
workflow.data,
workflow=serializer.data,
)
return Response(serializer.data, status=status.HTTP_201_CREATED)

Expand Down Expand Up @@ -177,18 +177,20 @@ def resolve(self, request, pk=None):
logger.info("Resolving data: %s", request.data)
serializer = self.resolution_serializer(data=request.data)
if serializer.is_valid(raise_exception=True):
extra_data = serializer.validated_data
logger.info(
"Trigger Airflow DAG: %s for %s",
AuthorResolutionDags[serializer.validated_data["value"]],
pk,
)
utils.add_decision(pk, request.user, serializer.validated_data["value"])

workflow = self.get_serializer(AuthorWorkflow.objects.get(pk=pk)).data

airflow_utils.trigger_airflow_dag(
AuthorResolutionDags[serializer.validated_data["value"]].label,
pk,
extra_data,
serializer.data,
workflow=workflow,
)
workflow_serializer = self.serializer_class(
get_object_or_404(AuthorWorkflow, pk=pk)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
interactions:
- request:
body: '{"dag_run_id": "00000000-0000-0000-0000-000000000001", "conf": {"workflow_id":
"00000000-0000-0000-0000-000000000001", "data": {"test": "test"}}}'
"00000000-0000-0000-0000-000000000001", "data": {"test": "test"}, "workflow":
{"id": "id"}}}'
headers:
Accept:
- '*/*'
Expand All @@ -10,33 +11,33 @@ interactions:
Connection:
- keep-alive
Content-Length:
- '145'
- '171'
Content-Type:
- application/json
method: POST
uri: http://airflow-webserver:8080/api/v1/dags/author_create_initialization_dag/dagRuns
response:
body:
string: "{\n \"conf\": {\n \"data\": {\n \"test\": \"test\"\n },\n
\ \"workflow_id\": \"00000000-0000-0000-0000-000000000001\"\n },\n \"dag_id\":
\"author_create_initialization_dag\",\n \"dag_run_id\": \"00000000-0000-0000-0000-000000000001\",\n
\ \"data_interval_end\": \"2024-10-18T11:54:51.246355+00:00\",\n \"data_interval_start\":
\"2024-10-18T11:54:51.246355+00:00\",\n \"end_date\": null,\n \"execution_date\":
\"2024-10-18T11:54:51.246355+00:00\",\n \"external_trigger\": true,\n \"last_scheduling_decision\":
null,\n \"logical_date\": \"2024-10-18T11:54:51.246355+00:00\",\n \"note\":
null,\n \"run_type\": \"manual\",\n \"start_date\": null,\n \"state\":
\"queued\"\n}\n"
\ \"workflow\": {\n \"id\": \"id\"\n },\n \"workflow_id\": \"00000000-0000-0000-0000-000000000001\"\n
\ },\n \"dag_id\": \"author_create_initialization_dag\",\n \"dag_run_id\":
\"00000000-0000-0000-0000-000000000001\",\n \"data_interval_end\": \"2024-11-20T14:25:30.752617+00:00\",\n
\ \"data_interval_start\": \"2024-11-20T14:25:30.752617+00:00\",\n \"end_date\":
null,\n \"execution_date\": \"2024-11-20T14:25:30.752617+00:00\",\n \"external_trigger\":
true,\n \"last_scheduling_decision\": null,\n \"logical_date\": \"2024-11-20T14:25:30.752617+00:00\",\n
\ \"note\": null,\n \"run_type\": \"manual\",\n \"start_date\": null,\n
\ \"state\": \"queued\"\n}\n"
headers:
Cache-Control:
- no-store
Connection:
- close
Content-Length:
- '621'
- '663'
Content-Type:
- application/json
Date:
- Fri, 18 Oct 2024 11:54:51 GMT
- Wed, 20 Nov 2024 14:25:30 GMT
Server:
- gunicorn
X-Robots-Tag:
Expand Down Expand Up @@ -68,7 +69,7 @@ interactions:
Content-Type:
- application/json
Date:
- Fri, 18 Oct 2024 11:54:51 GMT
- Wed, 20 Nov 2024 14:25:30 GMT
Server:
- gunicorn
X-Robots-Tag:
Expand Down
13 changes: 11 additions & 2 deletions backoffice/backoffice/authors/tests/test_airflow_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import uuid

import json
import pytest
from backoffice.authors import airflow_utils
from backoffice.authors.constants import WORKFLOW_DAGS, WorkflowType
Expand All @@ -11,16 +11,25 @@ def setUp(self):
self.workflow_id = uuid.UUID(int=1)
self.workflow_type = WorkflowType.AUTHOR_CREATE
self.dag_id = WORKFLOW_DAGS[self.workflow_type].initialize
self.extra_data = {"test": "test"}
self.workflow_serialized = {"id": "id"}

self.response = airflow_utils.trigger_airflow_dag(
self.dag_id, str(self.workflow_id), {"test": "test"}
self.dag_id,
str(self.workflow_id),
self.extra_data,
self.workflow_serialized,
)

def tearDown(self):
airflow_utils.delete_workflow_dag(self.dag_id, self.workflow_id)

@pytest.mark.vcr
def test_trigger_airflow_dag(self):
json_content = json.loads(self.response.content)
self.assertEqual(self.response.status_code, 200)
self.assertEqual(json_content["conf"]["data"], self.extra_data)
self.assertEqual(json_content["conf"]["workflow"], self.workflow_serialized)

@pytest.mark.vcr
def test_restart_failed_tasks(self):
Expand Down
1 change: 0 additions & 1 deletion backoffice/backoffice/authors/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ def test_create_happy_flow(self):
"ticket_id": "dc94caad1b4f71502d06117a3b4bcb25",
"ticket_type": "author_create_user",
}
# import ipdb; ipdb.set_trace()
response = self.api_client.post(self.endpoint, format="json", data=data)

assert response.status_code == 201
Expand Down
99 changes: 63 additions & 36 deletions workflows/dags/author/author_create/author_create_approved.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@

from airflow.decorators import dag, task
from airflow.models.param import Param
from author.author_create.shared_tasks import close_author_create_user_ticket
from airflow.utils.trigger_rule import TriggerRule
from hooks.backoffice.workflow_management_hook import AUTHORS, WorkflowManagementHook
from hooks.backoffice.workflow_ticket_management_hook import (
AuthorWorkflowTicketManagementHook,
)
from hooks.inspirehep.inspire_http_hook import InspireHttpHook
from hooks.inspirehep.inspire_http_hook import (
AUTHOR_CURATION_FUNCTIONAL_CATEGORY,
InspireHttpHook,
)
from hooks.inspirehep.inspire_http_record_management_hook import (
InspireHTTPRecordManagementHook,
)
from include.utils.set_workflow_status import (
get_wf_status_from_inspire_response,
set_workflow_status_to_error,
)
from include.utils.tickets import get_ticket_by_type

logger = logging.getLogger(__name__)

Expand All @@ -30,6 +34,7 @@
schedule=None,
catchup=False,
on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date!
tags=["authors"],
)
def author_create_approved_dag():
"""Defines the DAG for the author creation workflow after curator's approval.
Expand Down Expand Up @@ -60,29 +65,40 @@ def set_workflow_status_to_running(**context):

@task.branch()
def author_check_approval_branch(**context: dict) -> None:
"""Branching for the workflow: based on create_ticket parameter
"""Branching for the workflow: based on value parameter
dag goes either to create_ticket_on_author_approval task or
directly to create_author_on_inspire
"""
if context["params"]["create_ticket"]:
if context["params"]["data"]["value"] == "accept_curate":
return "create_author_create_curation_ticket"
else:
return "empty_task"
return "close_author_create_user_ticket"

@task
def create_author_create_curation_ticket(**context: dict) -> None:
endpoint = "api/tickets/create"
request_data = {
"functional_category": "",
"workflow_id": context["params"]["workflow_id"],
"subject": "test", # TODO: update subject and description
"description": "test",
"caller_email": "", # leave empty
"template": "curation_needed_author", # TODO: check template
}
response = inspire_http_hook.call_api(
endpoint=endpoint, data=request_data, method="POST"
workflow_data = context["params"]["workflow"]["data"]
email = workflow_data["acquisition_source"]["email"]

bai = f"[{workflow_data.get('bai')}]" if workflow_data.get("bai") else ""

control_number = context["ti"].xcom_pull(
task_ids="create_author_on_inspire", key="control_number"
)

inspire_http_hook.get_conn()

response = inspire_http_hook.create_ticket(
AUTHOR_CURATION_FUNCTIONAL_CATEGORY,
"curation_needed_author",
f"Curation needed for author"
f" {workflow_data.get('name').get('preferred_name')} {bai}",
email,
{
"email": email,
"record_url": f"{inspire_http_hook.base_url}/authors/{control_number}",
},
)

workflow_ticket_management_hook.create_ticket_entry(
workflow_id=context["params"]["workflow_id"],
ticket_id=response.json()["ticket_id"],
Expand All @@ -100,12 +116,14 @@ def create_author_on_inspire(**context: dict) -> str:
status = get_wf_status_from_inspire_response(response)
if response.ok:
control_number = response.json()["metadata"]["control_number"]
context["ti"].xcom_push(key="control_number", value=control_number)
logger.info(f"Created author with control number: {control_number}")
workflow_data["data"]["control_number"] = control_number
workflow_management_hook.partial_update_workflow(
workflow_id=context["params"]["workflow_id"],
workflow_partial_update_data={"data": workflow_data["data"]},
)
logger.info(f"Workflow status: {status}")
return status

@task.branch()
Expand All @@ -124,10 +142,26 @@ def set_author_create_workflow_status_to_completed(**context: dict) -> None:
status_name=status_name, workflow_id=context["params"]["workflow_id"]
)

@task
def empty_task() -> None:
# Logic to combine the results of branches
pass
@task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
def close_author_create_user_ticket(**context: dict) -> None:
ticket_id = get_ticket_by_type(
context["params"]["workflow"], "author_create_user"
)["ticket_id"]

workflow_data = context["params"]["workflow"]["data"]
email = workflow_data["acquisition_source"]["email"]
control_number = context["ti"].xcom_pull(
task_ids="create_author_on_inspire", key="control_number"
)

inspire_http_hook.get_conn()

request_data = {
"user_name": workflow_data["acquisition_source"].get("given_names", email),
"author_name": workflow_data.get("name").get("preferred_name"),
"record_url": f"{inspire_http_hook.base_url}/authors/{control_number}",
}
inspire_http_hook.close_ticket(ticket_id, "user_accepted_author", request_data)

@task()
def set_author_create_workflow_status_to_error(**context: dict) -> None:
Expand All @@ -150,24 +184,8 @@ def set_author_create_workflow_status_to_error(**context: dict) -> None:
set_author_create_workflow_status_to_completed()
)
set_workflow_status_to_error_task = set_author_create_workflow_status_to_error()
combine_ticket_and_no_ticket_task = empty_task()

# task dependencies
ticket_branch = create_author_create_curation_ticket_task
(
ticket_branch
>> close_author_create_user_ticket_task
>> set_workflow_status_to_completed_task
)

no_ticket_branch = combine_ticket_and_no_ticket_task
(
no_ticket_branch
>> close_author_create_user_ticket_task
>> set_workflow_status_to_completed_task
)

author_check_approval_branch_task >> [ticket_branch, no_ticket_branch]
(
set_status_to_running_task
>> create_author_on_inspire_task
Expand All @@ -177,6 +195,15 @@ def set_author_create_workflow_status_to_error(**context: dict) -> None:
author_check_approval_branch_task,
set_workflow_status_to_error_task,
]
(
[
author_check_approval_branch_task
>> create_author_create_curation_ticket_task,
author_check_approval_branch_task,
]
>> close_author_create_user_ticket_task
>> set_workflow_status_to_completed_task
)


author_create_approved_dag()
Loading

0 comments on commit 31f1e78

Please sign in to comment.