Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
restart actions: rework
Browse files Browse the repository at this point in the history
  • Loading branch information
DonHaul committed Jul 29, 2024
1 parent 590db48 commit 80e05d3
Show file tree
Hide file tree
Showing 10 changed files with 385 additions and 266 deletions.
105 changes: 105 additions & 0 deletions backoffice/backoffice/workflows/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from requests.exceptions import RequestException
from rest_framework import status

from backoffice.workflows.constants import WORKFLOW_DAGS

AIRFLOW_BASE_URL = environ.get("AIRFLOW_BASE_URL")

AIRFLOW_HEADERS = {
Expand Down Expand Up @@ -45,3 +47,106 @@ def trigger_airflow_dag(dag_id, workflow_id, extra_data=None):
except RequestException:
data = {"error": response.json()}
return JsonResponse(data, status=status.HTTP_502_BAD_GATEWAY)


def restart_failed_tasks(workflow):
"""Restarts failed tasks of an airflow dag.
:param workflow: workflow whooses tasks should be restarted
:returns: request response
"""

dag_id = find_failed_dag(workflow)

# assumes current task is one of the failed tasks
data = {
"dry_run": False,
"dag_run_id": str(workflow.id),
"reset_dag_runs": False,
"only_failed": True,
}

try:
response = requests.post(
f"{AIRFLOW_BASE_URL}/api/v1/dags/{dag_id}/clearTaskInstances",
json=data,
headers=AIRFLOW_HEADERS,
)
response.raise_for_status()
return JsonResponse(response.json())
except RequestException:
data = {"error": response.json()}
return JsonResponse(data, status=status.HTTP_424_FAILED_DEPENDENCY)


def find_executed_dags(workflow):
"""For a given workflow find dags associated to it
:param workflow: workflow to look dags for
:returns: dictionary with executed dags and their status
"""

executed_dags_for_workflow = {}
# find dags that were executed
for dag_id in WORKFLOW_DAGS[workflow.workflow_type]:
response = requests.get(
f"{AIRFLOW_BASE_URL}/api/v1/dags/{dag_id}/dagRuns/{workflow.id}",
headers=AIRFLOW_HEADERS,
)
if response.status_code == status.HTTP_200_OK:
executed_dags_for_workflow[dag_id] = response.content

return executed_dags_for_workflow


def find_failed_dag(workflow):
"""For a given workflow find failed dags
:param workflow: workflow to get failed dags
:returns: failed dag id or none
"""

executed_dags_for_workflow = find_executed_dags(workflow)

for dag, dag_data in executed_dags_for_workflow.items():
if dag_data["status"] == "failed":
return dag


def delete_workflow_dag(dag_id, workflow):
"""Delete dag run
:param dag_id: dag to be removed
:param workflow: workflow with the dag execution to be deleted
:returns: request response
"""
try:
response = requests.delete(
f"{AIRFLOW_BASE_URL}/api/v1/dags/{dag_id}/dagRuns/{str(workflow.id)}",
headers=AIRFLOW_HEADERS,
)
response.raise_for_status()
except RequestException:
data = {"error": response.json()}
return JsonResponse(data, status=status.HTTP_424_FAILED_DEPENDENCY)


def restart_workflow_dags(workflow, params):
"""Restarts dags of a given workflow.
:param workflow: workflow whoose dags should be restarted
:param params: parameters of new dag execution
:returns: request response
"""
executed_dags_for_workflow = find_executed_dags(workflow)

for dag_id in executed_dags_for_workflow:
# delete all executions of workflow
delete_workflow_dag(dag_id, workflow)

return trigger_airflow_dag(
WORKFLOW_DAGS[workflow.workflow_type][0], str(workflow.id), params
)

return JsonResponse(
{"error": "Failed to restart"}, status=status.HTTP_424_FAILED_DEPENDENCY
)
58 changes: 3 additions & 55 deletions backoffice/backoffice/workflows/api/views.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import logging

import requests
from django.shortcuts import get_object_or_404
from django_elasticsearch_dsl_drf.viewsets import BaseDocumentViewSet
from requests.exceptions import RequestException
from rest_framework import status, viewsets
from rest_framework.decorators import action
from rest_framework.response import Response
Expand Down Expand Up @@ -131,62 +129,12 @@ def resolve(self, request, pk=None):

@action(detail=True, methods=["post"])
def restart(self, request, pk=None):
params = request.data.get("params")
restart_current_task = request.data.get("restart_current_task", False)

workflow = Workflow.objects.get(id=pk)

data = {"dry_run": False, "dag_run_id": pk, "reset_dag_runs": True}

executed_dags_for_workflow = {}
# find dags that were executed
for dag_id in WORKFLOW_DAGS[workflow.workflow_type]:
response = requests.get(
f"{airflow_utils.AIRFLOW_BASE_URL}/api/v1/dags/{dag_id}/dagRuns/{pk}",
json=data,
headers=airflow_utils.AIRFLOW_HEADERS,
)
if response.status_code == status.HTTP_200_OK:
executed_dags_for_workflow[dag_id] = response.content

# assumes current task is one of the failed tasks
if restart_current_task:
data = {
"dry_run": False,
"dag_run_id": pk,
"reset_dag_runs": False,
"only_failed": True,
}

try:
response = requests.post(
f"{airflow_utils.AIRFLOW_BASE_URL}/api/v1/dags/{dag_id}/clearTaskInstances",
json=data,
headers=airflow_utils.AIRFLOW_HEADERS,
)
response.raise_for_status()

except RequestException:
data = {"error": response.json()}
return Response(data, status=status.HTTP_424_FAILED_DEPENDENCY)

return Response(response.json(), status=status.HTTP_200_OK)

# delete every executed_dag for this workflow
for dag_id in executed_dags_for_workflow:
# delete all executions of workflow
response = requests.delete(
f"{airflow_utils.AIRFLOW_BASE_URL}/api/v1/dags/{dag_id}/dagRuns/{pk}",
headers=airflow_utils.AIRFLOW_HEADERS,
)

return airflow_utils.trigger_airflow_dag(
WORKFLOW_DAGS[workflow.workflow_type][0], pk, params
)
if request.data.get("restart_current_task"):
return airflow_utils.restart_failed_tasks(workflow)

return Response(
{"error": "Failed to restart"}, status=status.HTTP_424_FAILED_DEPENDENCY
)
return airflow_utils.restart_workflow_dags(workflow, request.data.get("params"))


class WorkflowDocumentView(BaseDocumentViewSet):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ interactions:
Content-Type:
- application/json
method: POST
uri: http://airflow-webserver:8080/api/v1/dags/author_create_initialization_dag/dagRuns
uri: http://host.docker.internal:8080/api/v1/dags/author_create_initialization_dag/dagRuns
response:
body:
string: "{\n \"detail\": \"DAGRun with DAG ID: 'author_create_initialization_dag'
Expand All @@ -28,14 +28,46 @@ interactions:
Content-Type:
- application/problem+json
Date:
- Fri, 26 Jul 2024 13:52:28 GMT
- Mon, 29 Jul 2024 07:46:58 GMT
Server:
- gunicorn
X-Robots-Tag:
- noindex, nofollow
status:
code: 409
message: CONFLICT
- request:
body: null
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
Content-Length:
- '0'
Content-Type:
- application/json
method: DELETE
uri: http://host.docker.internal:8080/api/v1/dags/author_create_approved_dag/dagRuns/00000000-0000-0000-0000-000000000000
response:
body:
string: ''
headers:
Connection:
- close
Content-Type:
- application/json
Date:
- Mon, 29 Jul 2024 07:46:58 GMT
Server:
- gunicorn
X-Robots-Tag:
- noindex, nofollow
status:
code: 204
message: NO CONTENT
- request:
body: '{"dag_run_id": "00000000-0000-0000-0000-000000000000", "conf": {"workflow_id":
"00000000-0000-0000-0000-000000000000", "create_ticket": true}}'
Expand All @@ -51,16 +83,16 @@ interactions:
Content-Type:
- application/json
method: POST
uri: http://airflow-webserver:8080/api/v1/dags/author_create_approved_dag/dagRuns
uri: http://host.docker.internal:8080/api/v1/dags/author_create_approved_dag/dagRuns
response:
body:
string: "{\n \"conf\": {\n \"create_ticket\": true,\n \"workflow_id\":
\"00000000-0000-0000-0000-000000000000\"\n },\n \"dag_id\": \"author_create_approved_dag\",\n
\ \"dag_run_id\": \"00000000-0000-0000-0000-000000000000\",\n \"data_interval_end\":
\"2024-07-26T13:52:28.878645+00:00\",\n \"data_interval_start\": \"2024-07-26T13:52:28.878645+00:00\",\n
\ \"end_date\": null,\n \"execution_date\": \"2024-07-26T13:52:28.878645+00:00\",\n
\"2024-07-29T07:46:58.558021+00:00\",\n \"data_interval_start\": \"2024-07-29T07:46:58.558021+00:00\",\n
\ \"end_date\": null,\n \"execution_date\": \"2024-07-29T07:46:58.558021+00:00\",\n
\ \"external_trigger\": true,\n \"last_scheduling_decision\": null,\n \"logical_date\":
\"2024-07-26T13:52:28.878645+00:00\",\n \"note\": null,\n \"run_type\":
\"2024-07-29T07:46:58.558021+00:00\",\n \"note\": null,\n \"run_type\":
\"manual\",\n \"start_date\": null,\n \"state\": \"queued\"\n}\n"
headers:
Connection:
Expand All @@ -70,12 +102,44 @@ interactions:
Content-Type:
- application/json
Date:
- Fri, 26 Jul 2024 13:52:28 GMT
- Mon, 29 Jul 2024 07:46:58 GMT
Server:
- gunicorn
X-Robots-Tag:
- noindex, nofollow
status:
code: 200
message: OK
- request:
body: null
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
Content-Length:
- '0'
Content-Type:
- application/json
method: DELETE
uri: http://host.docker.internal:8080/api/v1/dags/author_create_approved_dag/dagRuns/00000000-0000-0000-0000-000000000000
response:
body:
string: ''
headers:
Connection:
- close
Content-Type:
- application/json
Date:
- Mon, 29 Jul 2024 07:46:58 GMT
Server:
- gunicorn
X-Robots-Tag:
- noindex, nofollow
status:
code: 204
message: NO CONTENT
version: 1
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ interactions:
Content-Type:
- application/json
method: POST
uri: http://airflow-webserver:8080/api/v1/dags/author_create_initialization_dag/dagRuns
uri: http://host.docker.internal:8080/api/v1/dags/author_create_initialization_dag/dagRuns
response:
body:
string: "{\n \"detail\": \"DAGRun with DAG ID: 'author_create_initialization_dag'
Expand All @@ -28,7 +28,7 @@ interactions:
Content-Type:
- application/problem+json
Date:
- Fri, 26 Jul 2024 13:52:29 GMT
- Mon, 29 Jul 2024 07:46:58 GMT
Server:
- gunicorn
X-Robots-Tag:
Expand All @@ -37,8 +37,8 @@ interactions:
code: 409
message: CONFLICT
- request:
body: '{"dag_run_id": "3e9f6f3b-a0b1-411b-b061-0e607ca9b695", "conf": {"workflow_id":
"3e9f6f3b-a0b1-411b-b061-0e607ca9b695", "native_name": "NATIVE_NAME", "alternate_name":
body: '{"dag_run_id": "eac8ef74-04a2-44ad-8ccd-eaa05fada3cb", "conf": {"workflow_id":
"eac8ef74-04a2-44ad-8ccd-eaa05fada3cb", "native_name": "NATIVE_NAME", "alternate_name":
"NAME", "display_name": "FIRST_NAME", "family_name": "LAST_NAME", "given_name":
"GIVEN_NAME"}}'
headers:
Expand All @@ -53,18 +53,18 @@ interactions:
Content-Type:
- application/json
method: POST
uri: http://airflow-webserver:8080/api/v1/dags/author_create_initialization_dag/dagRuns
uri: http://host.docker.internal:8080/api/v1/dags/author_create_initialization_dag/dagRuns
response:
body:
string: "{\n \"conf\": {\n \"alternate_name\": \"NAME\",\n \"display_name\":
\"FIRST_NAME\",\n \"family_name\": \"LAST_NAME\",\n \"given_name\":
\"GIVEN_NAME\",\n \"native_name\": \"NATIVE_NAME\",\n \"workflow_id\":
\"3e9f6f3b-a0b1-411b-b061-0e607ca9b695\"\n },\n \"dag_id\": \"author_create_initialization_dag\",\n
\ \"dag_run_id\": \"3e9f6f3b-a0b1-411b-b061-0e607ca9b695\",\n \"data_interval_end\":
\"2024-07-26T13:52:29.169946+00:00\",\n \"data_interval_start\": \"2024-07-26T13:52:29.169946+00:00\",\n
\ \"end_date\": null,\n \"execution_date\": \"2024-07-26T13:52:29.169946+00:00\",\n
\"eac8ef74-04a2-44ad-8ccd-eaa05fada3cb\"\n },\n \"dag_id\": \"author_create_initialization_dag\",\n
\ \"dag_run_id\": \"eac8ef74-04a2-44ad-8ccd-eaa05fada3cb\",\n \"data_interval_end\":
\"2024-07-29T07:46:58.907705+00:00\",\n \"data_interval_start\": \"2024-07-29T07:46:58.907705+00:00\",\n
\ \"end_date\": null,\n \"execution_date\": \"2024-07-29T07:46:58.907705+00:00\",\n
\ \"external_trigger\": true,\n \"last_scheduling_decision\": null,\n \"logical_date\":
\"2024-07-26T13:52:29.169946+00:00\",\n \"note\": null,\n \"run_type\":
\"2024-07-29T07:46:58.907705+00:00\",\n \"note\": null,\n \"run_type\":
\"manual\",\n \"start_date\": null,\n \"state\": \"queued\"\n}\n"
headers:
Connection:
Expand All @@ -74,7 +74,7 @@ interactions:
Content-Type:
- application/json
Date:
- Fri, 26 Jul 2024 13:52:29 GMT
- Mon, 29 Jul 2024 07:46:58 GMT
Server:
- gunicorn
X-Robots-Tag:
Expand Down
Loading

0 comments on commit 80e05d3

Please sign in to comment.