Skip to content

Commit

Permalink
airflow: fixing snowticket workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
DonHaul committed Nov 25, 2024
1 parent a323f49 commit 03deff4
Show file tree
Hide file tree
Showing 19 changed files with 630 additions and 208 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ django-setup:
docker compose exec backoffice-webserver python manage.py create_groups
docker compose exec backoffice-webserver python manage.py loaddata backoffice/users/fixtures/users.json
docker compose exec backoffice-webserver python manage.py loaddata backoffice/users/fixtures/tokens.json
docker compose exec backoffice-webserver python manage.py loaddata backoffice/authors/fixtures/workflows.json
echo "\033[1;32memail: [email protected] / password: admin \033[0m"
echo "Backoffice initialized"

Expand Down
18 changes: 12 additions & 6 deletions backoffice/backoffice/authors/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@
from django.http import JsonResponse
from requests.exceptions import RequestException
from rest_framework import status
import json
from django.core.serializers.json import DjangoJSONEncoder

from backoffice.authors.constants import WORKFLOW_DAGS

AIRFLOW_BASE_URL = environ.get("AIRFLOW_BASE_URL")

AIRFLOW_HEADERS = {
"Authorization": f"Basic {environ.get('AIRFLOW_TOKEN')}",
}
AIRFLOW_HEADERS = {"Authorization": f"Basic {environ.get('AIRFLOW_TOKEN')}"}

logger = logging.getLogger(__name__)


def trigger_airflow_dag(dag_id, workflow_id, extra_data=None):
def trigger_airflow_dag(dag_id, workflow_id, extra_data=None, workflow=None):
"""Triggers an airflow dag.
:param dag_id: name of the dag to run
Expand All @@ -27,8 +27,10 @@ def trigger_airflow_dag(dag_id, workflow_id, extra_data=None):

data = {"dag_run_id": str(workflow_id), "conf": {"workflow_id": str(workflow_id)}}

if extra_data is not None:
if extra_data:
data["conf"]["data"] = extra_data
if workflow:
data["conf"]["workflow"] = workflow

url = f"{AIRFLOW_BASE_URL}/api/v1/dags/{dag_id}/dagRuns"

Expand All @@ -39,7 +41,11 @@ def trigger_airflow_dag(dag_id, workflow_id, extra_data=None):
data,
url,
)
response = requests.post(url, json=data, headers=AIRFLOW_HEADERS)
response = requests.post(
url,
data=json.dumps(data, cls=DjangoJSONEncoder),
headers=AIRFLOW_HEADERS | {"Content-Type": "application/json"},
)
response.raise_for_status()
return JsonResponse(response.json())
except RequestException:
Expand Down
8 changes: 5 additions & 3 deletions backoffice/backoffice/authors/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def create(self, request):
airflow_utils.trigger_airflow_dag(
WORKFLOW_DAGS[workflow.workflow_type].initialize,
str(workflow.id),
workflow.data,
workflow=serializer.data,
)
return Response(serializer.data, status=status.HTTP_201_CREATED)

Expand Down Expand Up @@ -177,18 +177,20 @@ def resolve(self, request, pk=None):
logger.info("Resolving data: %s", request.data)
serializer = self.resolution_serializer(data=request.data)
if serializer.is_valid(raise_exception=True):
extra_data = serializer.validated_data
logger.info(
"Trigger Airflow DAG: %s for %s",
AuthorResolutionDags[serializer.validated_data["value"]],
pk,
)
utils.add_decision(pk, request.user, serializer.validated_data["value"])

workflow = self.get_serializer(AuthorWorkflow.objects.get(pk=pk)).data

airflow_utils.trigger_airflow_dag(
AuthorResolutionDags[serializer.validated_data["value"]].label,
pk,
extra_data,
serializer.data,
workflow=workflow,
)
workflow_serializer = self.serializer_class(
get_object_or_404(AuthorWorkflow, pk=pk)
Expand Down
30 changes: 30 additions & 0 deletions backoffice/backoffice/authors/fixtures/workflows.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[
{
"model": "authors.authorworkflow",
"pk": "00000000-0000-0000-0000-000000001521",
"fields": {
"workflow_type": "AUTHOR_CREATE",
"data": {
"name": {
"value": "B, Third",
"preferred_name": "Third B"
},
"status": "active",
"_collections": [
"Authors"
],
"acquisition_source": {
"email": "[email protected]",
"orcid": "0000-0000-0000-0000",
"method": "submitter",
"source": "submitter",
"datetime": "2024-11-18T11:34:19.809575",
"internal_uid": 50872
}
},
"status": "running",
"_created_at": "2024-11-25T13:49:53.009Z",
"_updated_at": "2024-11-25T13:49:54.756Z"
}
}
]
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
interactions:
- request:
body: '{"dag_run_id": "00000000-0000-0000-0000-000000000001", "conf": {"workflow_id":
"00000000-0000-0000-0000-000000000001", "data": {"test": "test"}}}'
"00000000-0000-0000-0000-000000000001", "data": {"test": "test"}, "workflow":
{"id": "id"}}}'
headers:
Accept:
- '*/*'
Expand All @@ -10,33 +11,33 @@ interactions:
Connection:
- keep-alive
Content-Length:
- '145'
- '171'
Content-Type:
- application/json
method: POST
uri: http://airflow-webserver:8080/api/v1/dags/author_create_initialization_dag/dagRuns
response:
body:
string: "{\n \"conf\": {\n \"data\": {\n \"test\": \"test\"\n },\n
\ \"workflow_id\": \"00000000-0000-0000-0000-000000000001\"\n },\n \"dag_id\":
\"author_create_initialization_dag\",\n \"dag_run_id\": \"00000000-0000-0000-0000-000000000001\",\n
\ \"data_interval_end\": \"2024-10-18T11:54:51.246355+00:00\",\n \"data_interval_start\":
\"2024-10-18T11:54:51.246355+00:00\",\n \"end_date\": null,\n \"execution_date\":
\"2024-10-18T11:54:51.246355+00:00\",\n \"external_trigger\": true,\n \"last_scheduling_decision\":
null,\n \"logical_date\": \"2024-10-18T11:54:51.246355+00:00\",\n \"note\":
null,\n \"run_type\": \"manual\",\n \"start_date\": null,\n \"state\":
\"queued\"\n}\n"
\ \"workflow\": {\n \"id\": \"id\"\n },\n \"workflow_id\": \"00000000-0000-0000-0000-000000000001\"\n
\ },\n \"dag_id\": \"author_create_initialization_dag\",\n \"dag_run_id\":
\"00000000-0000-0000-0000-000000000001\",\n \"data_interval_end\": \"2024-11-20T14:25:30.752617+00:00\",\n
\ \"data_interval_start\": \"2024-11-20T14:25:30.752617+00:00\",\n \"end_date\":
null,\n \"execution_date\": \"2024-11-20T14:25:30.752617+00:00\",\n \"external_trigger\":
true,\n \"last_scheduling_decision\": null,\n \"logical_date\": \"2024-11-20T14:25:30.752617+00:00\",\n
\ \"note\": null,\n \"run_type\": \"manual\",\n \"start_date\": null,\n
\ \"state\": \"queued\"\n}\n"
headers:
Cache-Control:
- no-store
Connection:
- close
Content-Length:
- '621'
- '663'
Content-Type:
- application/json
Date:
- Fri, 18 Oct 2024 11:54:51 GMT
- Wed, 20 Nov 2024 14:25:30 GMT
Server:
- gunicorn
X-Robots-Tag:
Expand Down Expand Up @@ -68,7 +69,7 @@ interactions:
Content-Type:
- application/json
Date:
- Fri, 18 Oct 2024 11:54:51 GMT
- Wed, 20 Nov 2024 14:25:30 GMT
Server:
- gunicorn
X-Robots-Tag:
Expand Down
13 changes: 11 additions & 2 deletions backoffice/backoffice/authors/tests/test_airflow_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import uuid

import json
import pytest
from backoffice.authors import airflow_utils
from backoffice.authors.constants import WORKFLOW_DAGS, WorkflowType
Expand All @@ -11,16 +11,25 @@ def setUp(self):
self.workflow_id = uuid.UUID(int=1)
self.workflow_type = WorkflowType.AUTHOR_CREATE
self.dag_id = WORKFLOW_DAGS[self.workflow_type].initialize
self.extra_data = {"test": "test"}
self.workflow_serialized = {"id": "id"}

self.response = airflow_utils.trigger_airflow_dag(
self.dag_id, str(self.workflow_id), {"test": "test"}
self.dag_id,
str(self.workflow_id),
self.extra_data,
self.workflow_serialized,
)

def tearDown(self):
airflow_utils.delete_workflow_dag(self.dag_id, self.workflow_id)

@pytest.mark.vcr
def test_trigger_airflow_dag(self):
json_content = json.loads(self.response.content)
self.assertEqual(self.response.status_code, 200)
self.assertEqual(json_content["conf"]["data"], self.extra_data)
self.assertEqual(json_content["conf"]["workflow"], self.workflow_serialized)

@pytest.mark.vcr
def test_restart_failed_tasks(self):
Expand Down
1 change: 0 additions & 1 deletion backoffice/backoffice/authors/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ def test_create_happy_flow(self):
"ticket_id": "dc94caad1b4f71502d06117a3b4bcb25",
"ticket_type": "author_create_user",
}
# import ipdb; ipdb.set_trace()
response = self.api_client.post(self.endpoint, format="json", data=data)

assert response.status_code == 201
Expand Down
Loading

0 comments on commit 03deff4

Please sign in to comment.