Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

User Actions: Restart workflow #53

Merged
merged 1 commit into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ local_settings.py
db.sqlite3
db.sqlite3-journal

# Airflow stuff:
workflows/logs/

# Flask stuff:
instance/
.webassets-cache
Expand Down
2 changes: 1 addition & 1 deletion backoffice/.envs/local/.django
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ OPENSEARCH_HOST=opensearch:9200
OPENSEARCH_INDEX_PREFIX=backoffice-backend-local

# Airflow
AIRFLOW_BASE_URL=http://host.docker.internal:8080
AIRFLOW_BASE_URL=http://airflow-webserver:8080
AIRFLOW_TOKEN=YWlyZmxvdzphaXJmbG93
131 changes: 128 additions & 3 deletions backoffice/backoffice/workflows/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from requests.exceptions import RequestException
from rest_framework import status

from backoffice.workflows.constants import WORKFLOW_DAGS

AIRFLOW_BASE_URL = environ.get("AIRFLOW_BASE_URL")

AIRFLOW_HEADERS = {
Expand All @@ -24,7 +26,7 @@ def trigger_airflow_dag(dag_id, workflow_id, extra_data=None):
:returns: request response
"""

data = {"dag_run_id": workflow_id, "conf": {"workflow_id": workflow_id}}
data = {"dag_run_id": str(workflow_id), "conf": {"workflow_id": str(workflow_id)}}

if extra_data is not None:
data["conf"].update(extra_data)
Expand All @@ -33,10 +35,9 @@ def trigger_airflow_dag(dag_id, workflow_id, extra_data=None):

try:
logger.info(
"Triggering DAG %s with data: %s and %s %s",
"Triggering DAG %s with data: %s and %s",
dag_id,
data,
AIRFLOW_HEADERS,
url,
)
response = requests.post(url, json=data, headers=AIRFLOW_HEADERS)
Expand All @@ -45,3 +46,127 @@ def trigger_airflow_dag(dag_id, workflow_id, extra_data=None):
except RequestException:
data = {"error": response.json()}
return JsonResponse(data, status=status.HTTP_502_BAD_GATEWAY)


def restart_failed_tasks(workflow_id, workflow_type):
"""Restarts failed tasks of an airflow dag.

:param workflow_id: id of workflow to restart failed tasks
:param workflow_type: type of workflow to retrieve
:returns: request response
"""

dag_id = find_failed_dag(workflow_id, workflow_type)
# assumes current task is one of the failed tasks
data = {
"dry_run": False,
"dag_run_id": str(workflow_id),
"reset_dag_runs": False,
"only_failed": True,
}

url = f"{AIRFLOW_BASE_URL}/api/v1/dags/{dag_id}/clearTaskInstances"

try:
logger.info(
"Clearing Failed Tasks of DAG %s with data: %s and %s",
dag_id,
data,
url,
)
response = requests.post(
url,
json=data,
headers=AIRFLOW_HEADERS,
)
drjova marked this conversation as resolved.
Show resolved Hide resolved
response.raise_for_status()
return JsonResponse(response.json())
except RequestException:
data = {"error": response.json()}
return JsonResponse(data, status=status.HTTP_424_FAILED_DEPENDENCY)


def find_executed_dags(workflow_id, workflow_type):
"""For a given workflow find dags associated to it.

:param workflow_id: id of workflow to retrieve executed dags
:param workflow_type: type of workflow to retrieve
:returns: dictionary with executed dags and their status
"""

executed_dags_for_workflow = {}
# find dags that were executed
for dag_id in WORKFLOW_DAGS[workflow_type]:
response = requests.get(
f"{AIRFLOW_BASE_URL}/api/v1/dags/{dag_id}/dagRuns/{workflow_id}",
headers=AIRFLOW_HEADERS,
)
if response.status_code == status.HTTP_200_OK:
executed_dags_for_workflow[dag_id] = response.json()

return executed_dags_for_workflow


def find_failed_dag(workflow_id, workflow_type):
"""For a given workflow find failed dags.

:param workflow_id: id of workflow to retrieve the failed dags
:param workflow_type: type of workflow to retrieve

:returns: failed dag id or none
"""

executed_dags_for_workflow = find_executed_dags(str(workflow_id), workflow_type)

for dag, dag_data in executed_dags_for_workflow.items():
if dag_data["state"] == "failed":
return dag


def delete_workflow_dag(dag_id, workflow_id):
"""Delete dag run.

:param dag_id: dag to be removed
:param workflow_id: id of workflow whoose dag execution should be deleted
:returns: request response
"""

url = f"{AIRFLOW_BASE_URL}/api/v1/dags/{dag_id}/dagRuns/{str(workflow_id)}"
try:
logger.info(
"Deleting dag Failed Tasks of DAG %s with no data and %s",
dag_id,
url,
)
response = requests.delete(
url,
headers=AIRFLOW_HEADERS,
)
response.raise_for_status()
return JsonResponse({"message": "Successfully deleted DAG"})
except RequestException:
return JsonResponse(
{"error": "Failed to delete DAG"}, status=status.HTTP_424_FAILED_DEPENDENCY
)


def restart_workflow_dags(workflow_id, workflow_type, params=None):
"""Restarts dags of a given workflow.

:param workflow_id: workflow_id for dags that should be restarted
:param workflow_type: type of workflow the will be restarted
:param params: parameters of new dag execution
:returns: request response
"""
executed_dags_for_workflow = find_executed_dags(workflow_id, workflow_type)

for dag_id in executed_dags_for_workflow:
delete_workflow_dag(dag_id, str(workflow_id))

return trigger_airflow_dag(
WORKFLOW_DAGS[workflow_type].initialize, str(workflow_id), params
)

return JsonResponse(
{"error": "Failed to restart"}, status=status.HTTP_424_FAILED_DEPENDENCY
)
20 changes: 17 additions & 3 deletions backoffice/backoffice/workflows/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
WorkflowSerializer,
WorkflowTicketSerializer,
)
from backoffice.workflows.constants import WORKFLOW_DAG, ResolutionDags
from backoffice.workflows.constants import WORKFLOW_DAGS, ResolutionDags
from backoffice.workflows.documents import WorkflowDocument
from backoffice.workflows.models import Workflow, WorkflowTicket

Expand Down Expand Up @@ -105,11 +105,13 @@ def create(self, request):
)
logger.info(
"Trigger Airflow DAG: %s for %s",
WORKFLOW_DAG[workflow.workflow_type],
WORKFLOW_DAGS[workflow.workflow_type].initialize,
workflow.id,
)
return airflow_utils.trigger_airflow_dag(
WORKFLOW_DAG[workflow.workflow_type], str(workflow.id), workflow.data
WORKFLOW_DAGS[workflow.workflow_type].initialize,
str(workflow.id),
workflow.data,
)

@action(detail=True, methods=["post"])
Expand All @@ -123,10 +125,22 @@ def resolve(self, request, pk=None):
ResolutionDags[serializer.validated_data["value"]],
pk,
)

return airflow_utils.trigger_airflow_dag(
ResolutionDags[serializer.validated_data["value"]].label, pk, extra_data
)

@action(detail=True, methods=["post"])
def restart(self, request, pk=None):
drjova marked this conversation as resolved.
Show resolved Hide resolved
workflow = Workflow.objects.get(id=pk)

if request.data.get("restart_current_task"):
return airflow_utils.restart_failed_tasks(workflow)

return airflow_utils.restart_workflow_dags(
workflow.id, workflow.workflow_type, request.data.get("params")
)


class WorkflowDocumentView(BaseDocumentViewSet):
def __init__(self, *args, **kwargs):
Expand Down
26 changes: 18 additions & 8 deletions backoffice/backoffice/workflows/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,25 @@ class WorkflowType(models.TextChoices):

DEFAULT_WORKFLOW_TYPE = WorkflowType.HEP_CREATE

# author dags for each workflow type
WORKFLOW_DAG = {
WorkflowType.HEP_CREATE: "",
WorkflowType.HEP_UPDATE: "",
WorkflowType.AUTHOR_CREATE: "author_create_initialization_dag",
WorkflowType.AUTHOR_UPDATE: "author_update_dag",
}


class ResolutionDags(models.TextChoices):
accept = "accept", "author_create_approved_dag"
reject = "reject", "author_create_rejected_dag"


class AuthorCreateDags(models.TextChoices):
initialize = "author_create_initialization_dag", "initialize"
approve = "author_create_approved_dag", "approve"
reject = "author_create_rejected_dag", "reject"


class AuthorUpdateDags(models.TextChoices):
initialize = "author_update_dag", "initialize"


WORKFLOW_DAGS = {
drjova marked this conversation as resolved.
Show resolved Hide resolved
WorkflowType.HEP_CREATE: "",
WorkflowType.HEP_UPDATE: "",
WorkflowType.AUTHOR_CREATE: AuthorCreateDags,
WorkflowType.AUTHOR_UPDATE: AuthorUpdateDags,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
interactions:
- request:
body: '{"dag_run_id": "00000000-0000-0000-0000-000000000001", "conf": {"workflow_id":
"00000000-0000-0000-0000-000000000001"}}'
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
Content-Length:
- '119'
Content-Type:
- application/json
method: POST
uri: http://airflow-webserver:8080/api/v1/dags/author_create_initialization_dag/dagRuns
response:
body:
string: "{\n \"conf\": {\n \"workflow_id\": \"00000000-0000-0000-0000-000000000001\"\n
\ },\n \"dag_id\": \"author_create_initialization_dag\",\n \"dag_run_id\":
\"00000000-0000-0000-0000-000000000001\",\n \"data_interval_end\": \"2024-07-30T12:13:41.736880+00:00\",\n
\ \"data_interval_start\": \"2024-07-30T12:13:41.736880+00:00\",\n \"end_date\":
null,\n \"execution_date\": \"2024-07-30T12:13:41.736880+00:00\",\n \"external_trigger\":
true,\n \"last_scheduling_decision\": null,\n \"logical_date\": \"2024-07-30T12:13:41.736880+00:00\",\n
\ \"note\": null,\n \"run_type\": \"manual\",\n \"start_date\": null,\n
\ \"state\": \"queued\"\n}\n"
headers:
Connection:
- close
Content-Length:
- '579'
Content-Type:
- application/json
Date:
- Tue, 30 Jul 2024 12:13:41 GMT
Server:
- gunicorn
X-Robots-Tag:
- noindex, nofollow
status:
code: 200
message: OK
- request:
body: null
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
Content-Length:
- '0'
Content-Type:
- application/json
method: DELETE
uri: http://airflow-webserver:8080/api/v1/dags/author_create_initialization_dag/dagRuns/00000000-0000-0000-0000-000000000001
response:
body:
string: ''
headers:
Connection:
- close
Content-Type:
- application/json
Date:
- Tue, 30 Jul 2024 12:13:41 GMT
Server:
- gunicorn
X-Robots-Tag:
- noindex, nofollow
status:
code: 204
message: NO CONTENT
- request:
body: null
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
Content-Length:
- '0'
Content-Type:
- application/json
method: DELETE
uri: http://airflow-webserver:8080/api/v1/dags/author_create_initialization_dag/dagRuns/00000000-0000-0000-0000-000000000001
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we trying to delete twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes so this by default for every test in the tearDown of every test we are deleting the execution.
Because for the test_delete_workflow_dag test we are specifically testing that the deletion is done correctly, the tearDown wont be able to delete anything in this case

response:
body:
string: "{\n \"detail\": \"DAGRun with DAG ID: 'author_create_initialization_dag'
and DagRun ID: '00000000-0000-0000-0000-000000000001' not found\",\n \"status\":
404,\n \"title\": \"Not Found\",\n \"type\": \"https://airflow.apache.org/docs/apache-airflow/2.8.3/stable-rest-api-ref.html#section/Errors/NotFound\"\n}\n"
headers:
Connection:
- close
Content-Length:
- '293'
Content-Type:
- application/problem+json
Date:
- Tue, 30 Jul 2024 12:13:41 GMT
Server:
- gunicorn
X-Robots-Tag:
- noindex, nofollow
status:
code: 404
message: NOT FOUND
version: 1
Loading
Loading