Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

User Actions: Restart workflow #53

Merged
merged 1 commit into from
Aug 1, 2024
Merged

Conversation

DonHaul
Copy link
Contributor

@DonHaul DonHaul commented Jul 10, 2024

#483

User Actions added:

  • restart
  • restart current
  • restart with params

Missing User Action:

  • skip to step (by default next)

/api/workflows/authors/<id>/restart it might change but for now it receives:

{
  "restart_current_task" : true # if not specified defaults to false
  "params" : {dictionary with new params} # this was requested in user action buts its not yet specified waht this params are, if not specified, a full restart is done
}

also it can simply not receive nothing in that case a full restart occurs

@DonHaul DonHaul marked this pull request as draft July 10, 2024 11:19
@DonHaul DonHaul force-pushed the restart-workflow branch 7 times, most recently from 76602b2 to b255a7b Compare July 17, 2024 08:24
@DonHaul DonHaul marked this pull request as ready for review July 17, 2024 08:30
@DonHaul DonHaul changed the title Restart workflow User Actions: Restart workflow Jul 17, 2024
@DonHaul DonHaul requested a review from drjova July 17, 2024 08:33
Copy link
Contributor

@drjova drjova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you , I have added few comments

@action(detail=True, methods=["post"])
def restart(self, request, pk=None):

params = request.data.get("params", None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.get by default returns None, no need to specify it.

data = {"dry_run": False, "dag_run_id": pk, "reset_dag_runs": True}

executed_dags_for_workflow = {}
# find dags that were executed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need the comments :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed. why not?

Comment on lines 114 to 122
for dag_id in AUTHOR_DAGS[workflow.workflow_type]:
response = requests.get(
f"{airflow_utils.AIRFLOW_BASE_URL}/api/v1/dags/{dag_id}/dagRuns/{pk}",
json=data,
headers=airflow_utils.AIRFLOW_HEADERS,
)
if response.status_code == status.HTTP_200_OK:
executed_dags_for_workflow[dag_id] = response.content
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need to iterate through all the dags? for example this code will run both approve and reject DAGS

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For that case it will simply check which one of them got executed and restart it. (if the response is 200 OK it means it got executed - regardless of finishing successfully on)

In general this code serves to fetch which dags of a given workflow have run.
For example there can be cases that, for a given workflow, only the author_create_initialization_dag dag ran and no approve nor reject got triggered . in this case it will only restart this 1 dag

headers=airflow_utils.AIRFLOW_HEADERS,
)
if response.status_code != 200:
return Response({"error": "Failed to restart task"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not put in every error 500, as we have already API exceptions I will create one for failed to restart task for example, with a different error code. 500 is usually for unexpected server error, in these cases we know what's going on.

json=data,
headers=airflow_utils.AIRFLOW_HEADERS,
)
if response.status_code != 200:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can just raise for status and catch here


return airflow_utils.trigger_airflow_dag(WORKFLOW_DAG[workflow.workflow_type], pk, params)

return Response({"error": "Failed to restart"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need this we can try catch the code above


return Response(response.json(), status=status.HTTP_200_OK)

else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need else

Comment on lines 313 to 329
def patch_requests():
with patch("requests.post") as mock_post, patch("requests.get") as mock_get, patch(
"requests.delete"
) as mock_delete:

# Configure the mock for requests.post
mock_post.return_value.status_code = 200
mock_post.return_value.json.return_value = {"key": "value"}

# Configure the mock for requests.get
mock_get.return_value.status_code = 200
mock_get.return_value.json.return_value = {"data": "some_data"}

# Configure the mock for requests.delete
mock_delete.return_value.status_code = 204

yield mock_post, mock_get, mock_delete
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use pytest-vcr for that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DonHaul DonHaul force-pushed the restart-workflow branch 4 times, most recently from a0d762e to 4424c53 Compare July 26, 2024 09:23
- application/json
User-Agent:
- python-requests/2.31.0
method: POST
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/inspirehep/inspirehep/blob/master/backend/tests/conftest.py#L11-L36

we need to have similar config not to record internal services, such as opensearch

Copy link
Contributor Author

@DonHaul DonHaul Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that I think about it backoffice tests are currently only accessing internal services since the workflows are now also contained in this repo. removing pytest-vcr for now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the workflows despite it's in the same repo it's not the same service and for the moment we keep them separate

@DonHaul DonHaul force-pushed the restart-workflow branch 4 times, most recently from 799feae to 9ae09c1 Compare July 26, 2024 12:48
Copy link
Contributor

@drjova drjova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, few comments



# author dags for each workflow type
AUTHOR_DAGS = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aren't we having this already in line 33?

2024-07-26
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually we shouldn't include the logs folder, could you please delete it and add it to .gitignore?

from . import views

urlpatterns = [
path(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we have this here? We already have the definitions here

router.register("workflows", WorkflowViewSet, basename="workflows")
router.register(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed it was not doing anything - probably part of an old implementation I had tried.
Removed

Comment on lines 149 to 150
if response.status_code == status.HTTP_200_OK:
executed_dags_for_workflow[dag_id] = response.content
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

response.raise_for_status()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this use case is a bit different. If a response is different from 200 OK its not necessarily an error, it simply means the workflow didnt reach that dag execution yet. (e.g. if its in the approval status, the request to check if the accept dag was executed will fail)

executed_dags_for_workflow[dag_id] = response.content

# assumes current task is one of the failed tasks
if restart_current_task:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here the workflows are already restarted right? so there is not much of a point for restarting one task. Do I miss something? Instead we should either restart all or restart a specific task

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in here the worfklows haven not yet been restarted. the section of code before it is just checking what dags ran for this workflow. moving this section 141-150 below to make it more clear

backoffice/backoffice/workflows/api/views.py Show resolved Hide resolved
@DonHaul DonHaul force-pushed the restart-workflow branch 7 times, most recently from 1d03468 to 7ee5c32 Compare July 29, 2024 14:37
headers=AIRFLOW_HEADERS,
)
response.raise_for_status()
return HttpResponse()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why here we are using HttpResponse and in other places JsonResponse?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this request returns 204 NO CONTENT and the response was empty, which was failing with JsonResponse as it requires content.
Alternatively I can readd with JsonResponse with some informative json object

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes a json response with success message

executed_dags_for_workflow = find_executed_dags(workflow)

for dag_id in executed_dags_for_workflow:
# delete all executions of workflow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need this :)

return JsonResponse(data, status=status.HTTP_424_FAILED_DEPENDENCY)


def find_executed_dags(workflow):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to pass around the whole object? we can simple pass the id

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need also the workflow_type, to find which specific dags have been executed. should I just pass id and workflow_type instead of the full workflow object

return executed_dags_for_workflow


def find_failed_dag(workflow):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

backoffice/backoffice/workflows/airflow_utils.py Outdated Show resolved Hide resolved
self.dag_id, str(self.workflow.id)
)

def tearDown(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't use types :)


@pytest.mark.vcr()
def test_restart_failed_tasks(self):
time.sleep(20) # wait for dag to fail
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need that? It's anyway a recording

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was atleast needed to make the recording, I can remove it.
However if for some reason we need to rerecord. this delays need to be added else the tests will fail

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove it, we are adding 40secs extra to the tests and in general it's not recommended to block execution with sleep


@pytest.mark.vcr()
def test_find_failed_dag(self):
time.sleep(20) # wait for dag to fail
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

return HttpResponse(status=status.HTTP_424_FAILED_DEPENDENCY)


def restart_workflow_dags(workflow, params=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@DonHaul DonHaul force-pushed the restart-workflow branch from f7c9b3d to 3a4ce92 Compare July 30, 2024 09:06
Copy link
Contributor

@drjova drjova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few more comments, thank you 🙏

"Clearing Failed Tasks of DAG %s with data: %s and %s %s",
dag_id,
data,
AIRFLOW_HEADERS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will expose the token in the logs and it's not recommended

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay they were already being exposed in other logs from a previous PR we had merged so I though it was ok. Removing tokens from those logs as well

logger.info(
"Deketing dag Failed Tasks of DAG %s with no data and %s %s",
dag_id,
AIRFLOW_HEADERS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

headers=AIRFLOW_HEADERS,
)
response.raise_for_status()
return HttpResponse()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes a json response with success message


@pytest.mark.vcr()
def test_restart_failed_tasks(self):
time.sleep(20) # wait for dag to fail
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove it, we are adding 40secs extra to the tests and in general it's not recommended to block execution with sleep

backoffice/backoffice/workflows/constants.py Show resolved Hide resolved
Content-Type:
- application/json
method: GET
uri: http://airflow-webserver:8080/api/v1/dags/author_create_approved_dag/dagRuns/00000000-0000-0000-0000-000000000001
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is something wrong, it returns 404 actually. Could we please double check all the cassettes that we have the correct responses?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cassetes have been reviewed, minor fixes.
Some of them will indeed have respondes that contain 404, as its the only way I found to see if a given dag was executed or not.

@DonHaul DonHaul force-pushed the restart-workflow branch from 645026c to a243824 Compare July 30, 2024 14:18
Copy link
Contributor

@drjova drjova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DonHaul few minor changes :)

def find_failed_dag(workflow_id, workflow_type):
"""For a given workflow find failed dags.

:param workflow: workflow to get failed dags
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have to update docstrings :)

def restart_workflow_dags(workflow_id, workflow_type, params=None):
"""Restarts dags of a given workflow.

:param workflow: workflow whoose dags should be restarted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

def find_executed_dags(workflow_id, workflow_type):
"""For a given workflow find dags associated to it.

:param workflow: workflow to look dags for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Content-Type:
- application/json
method: DELETE
uri: http://airflow-webserver:8080/api/v1/dags/author_create_initialization_dag/dagRuns/00000000-0000-0000-0000-000000000001
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we trying to delete twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes so this by default for every test in the tearDown of every test we are deleting the execution.
Because for the test_delete_workflow_dag test we are specifically testing that the deletion is done correctly, the tearDown wont be able to delete anything in this case

def tearDown(self):
super().tearDown()
airflow_utils.delete_workflow_dag(
WORKFLOW_DAGS[self.workflow.workflow_type].initialize, str(self.workflow.id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's be consistent, in some places we do str in some others we don't

status="running",
core=True,
is_update=False,
workflow_type="AUTHOR_CREATE",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please let's use everywhere the constant, to be easy to propagate changes everywhere and to avoid changing strings one-by-one

@DonHaul DonHaul force-pushed the restart-workflow branch from 11f79fa to 93fc158 Compare July 31, 2024 15:17
@drjova drjova merged commit 5250f56 into inspirehep:main Aug 1, 2024
6 checks passed
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants