-
Notifications
You must be signed in to change notification settings - Fork 4
Conversation
76602b2
to
b255a7b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you , I have added few comments
backoffice/workflows/api/views.py
Outdated
@action(detail=True, methods=["post"]) | ||
def restart(self, request, pk=None): | ||
|
||
params = request.data.get("params", None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.get
by default returns None
, no need to specify it.
backoffice/workflows/api/views.py
Outdated
data = {"dry_run": False, "dag_run_id": pk, "reset_dag_runs": True} | ||
|
||
executed_dags_for_workflow = {} | ||
# find dags that were executed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need the comments :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed. why not?
backoffice/workflows/api/views.py
Outdated
for dag_id in AUTHOR_DAGS[workflow.workflow_type]: | ||
response = requests.get( | ||
f"{airflow_utils.AIRFLOW_BASE_URL}/api/v1/dags/{dag_id}/dagRuns/{pk}", | ||
json=data, | ||
headers=airflow_utils.AIRFLOW_HEADERS, | ||
) | ||
if response.status_code == status.HTTP_200_OK: | ||
executed_dags_for_workflow[dag_id] = response.content |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we need to iterate through all the dags? for example this code will run both approve
and reject
DAGS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For that case it will simply check which one of them got executed and restart it. (if the response is 200 OK it means it got executed - regardless of finishing successfully on)
In general this code serves to fetch which dags of a given workflow have run.
For example there can be cases that, for a given workflow, only the author_create_initialization_dag
dag ran and no approve
nor reject
got triggered . in this case it will only restart this 1 dag
backoffice/workflows/api/views.py
Outdated
headers=airflow_utils.AIRFLOW_HEADERS, | ||
) | ||
if response.status_code != 200: | ||
return Response({"error": "Failed to restart task"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not put in every error 500
, as we have already API exceptions I will create one for failed to restart task for example, with a different error code. 500
is usually for unexpected server error, in these cases we know what's going on.
backoffice/workflows/api/views.py
Outdated
json=data, | ||
headers=airflow_utils.AIRFLOW_HEADERS, | ||
) | ||
if response.status_code != 200: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can just raise for status and catch here
backoffice/workflows/api/views.py
Outdated
|
||
return airflow_utils.trigger_airflow_dag(WORKFLOW_DAG[workflow.workflow_type], pk, params) | ||
|
||
return Response({"error": "Failed to restart"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need this we can try catch the code above
backoffice/workflows/api/views.py
Outdated
|
||
return Response(response.json(), status=status.HTTP_200_OK) | ||
|
||
else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need else
def patch_requests(): | ||
with patch("requests.post") as mock_post, patch("requests.get") as mock_get, patch( | ||
"requests.delete" | ||
) as mock_delete: | ||
|
||
# Configure the mock for requests.post | ||
mock_post.return_value.status_code = 200 | ||
mock_post.return_value.json.return_value = {"key": "value"} | ||
|
||
# Configure the mock for requests.get | ||
mock_get.return_value.status_code = 200 | ||
mock_get.return_value.json.return_value = {"data": "some_data"} | ||
|
||
# Configure the mock for requests.delete | ||
mock_delete.return_value.status_code = 204 | ||
|
||
yield mock_post, mock_get, mock_delete |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can use pytest-vcr for that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a0d762e
to
4424c53
Compare
- application/json | ||
User-Agent: | ||
- python-requests/2.31.0 | ||
method: POST |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/inspirehep/inspirehep/blob/master/backend/tests/conftest.py#L11-L36
we need to have similar config not to record internal services, such as opensearch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now that I think about it backoffice tests are currently only accessing internal services since the workflows
are now also contained in this repo. removing pytest-vcr for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the workflows despite it's in the same repo it's not the same service and for the moment we keep them separate
799feae
to
9ae09c1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, few comments
|
||
|
||
# author dags for each workflow type | ||
AUTHOR_DAGS = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aren't we having this already in line 33?
workflows/logs/scheduler/latest
Outdated
2024-07-26 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually we shouldn't include the logs folder, could you please delete it and add it to .gitignore?
from . import views | ||
|
||
urlpatterns = [ | ||
path( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we have this here? We already have the definitions here
backoffice/backoffice/config/api_router.py
Lines 22 to 23 in b9562c6
router.register("workflows", WorkflowViewSet, basename="workflows") | |
router.register( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed it was not doing anything - probably part of an old implementation I had tried.
Removed
if response.status_code == status.HTTP_200_OK: | ||
executed_dags_for_workflow[dag_id] = response.content |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
response.raise_for_status()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this use case is a bit different. If a response is different from 200 OK
its not necessarily an error, it simply means the workflow didnt reach that dag execution yet. (e.g. if its in the approval status, the request to check if the accept
dag was executed will fail)
executed_dags_for_workflow[dag_id] = response.content | ||
|
||
# assumes current task is one of the failed tasks | ||
if restart_current_task: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here the workflows are already restarted right? so there is not much of a point for restarting one task. Do I miss something? Instead we should either restart all or restart a specific task
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in here the worfklows haven not yet been restarted. the section of code before it is just checking what dags ran for this workflow. moving this section 141-150 below to make it more clear
1d03468
to
7ee5c32
Compare
headers=AIRFLOW_HEADERS, | ||
) | ||
response.raise_for_status() | ||
return HttpResponse() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why here we are using HttpResponse
and in other places JsonResponse
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this request returns 204 NO CONTENT
and the response was empty, which was failing with JsonResponse
as it requires content.
Alternatively I can readd with JsonResponse
with some informative json object
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes a json response with success message
executed_dags_for_workflow = find_executed_dags(workflow) | ||
|
||
for dag_id in executed_dags_for_workflow: | ||
# delete all executions of workflow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need this :)
return JsonResponse(data, status=status.HTTP_424_FAILED_DEPENDENCY) | ||
|
||
|
||
def find_executed_dags(workflow): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to pass around the whole object? we can simple pass the id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need also the workflow_type, to find which specific dags have been executed. should I just pass id
and workflow_type
instead of the full workflow
object
return executed_dags_for_workflow | ||
|
||
|
||
def find_failed_dag(workflow): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
self.dag_id, str(self.workflow.id) | ||
) | ||
|
||
def tearDown(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't use types :)
|
||
@pytest.mark.vcr() | ||
def test_restart_failed_tasks(self): | ||
time.sleep(20) # wait for dag to fail |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need that? It's anyway a recording
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was atleast needed to make the recording, I can remove it.
However if for some reason we need to rerecord. this delays need to be added else the tests will fail
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's remove it, we are adding 40secs extra to the tests and in general it's not recommended to block execution with sleep
|
||
@pytest.mark.vcr() | ||
def test_find_failed_dag(self): | ||
time.sleep(20) # wait for dag to fail |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
return HttpResponse(status=status.HTTP_424_FAILED_DEPENDENCY) | ||
|
||
|
||
def restart_workflow_dags(workflow, params=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
f7c9b3d
to
3a4ce92
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few more comments, thank you 🙏
"Clearing Failed Tasks of DAG %s with data: %s and %s %s", | ||
dag_id, | ||
data, | ||
AIRFLOW_HEADERS, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will expose the token in the logs and it's not recommended
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay they were already being exposed in other logs from a previous PR we had merged so I though it was ok. Removing tokens from those logs as well
logger.info( | ||
"Deketing dag Failed Tasks of DAG %s with no data and %s %s", | ||
dag_id, | ||
AIRFLOW_HEADERS, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
headers=AIRFLOW_HEADERS, | ||
) | ||
response.raise_for_status() | ||
return HttpResponse() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes a json response with success message
|
||
@pytest.mark.vcr() | ||
def test_restart_failed_tasks(self): | ||
time.sleep(20) # wait for dag to fail |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's remove it, we are adding 40secs extra to the tests and in general it's not recommended to block execution with sleep
Content-Type: | ||
- application/json | ||
method: GET | ||
uri: http://airflow-webserver:8080/api/v1/dags/author_create_approved_dag/dagRuns/00000000-0000-0000-0000-000000000001 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is something wrong, it returns 404 actually. Could we please double check all the cassettes that we have the correct responses?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cassetes have been reviewed, minor fixes.
Some of them will indeed have respondes that contain 404, as its the only way I found to see if a given dag was executed or not.
645026c
to
a243824
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@DonHaul few minor changes :)
def find_failed_dag(workflow_id, workflow_type): | ||
"""For a given workflow find failed dags. | ||
|
||
:param workflow: workflow to get failed dags |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have to update docstrings :)
def restart_workflow_dags(workflow_id, workflow_type, params=None): | ||
"""Restarts dags of a given workflow. | ||
|
||
:param workflow: workflow whoose dags should be restarted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
def find_executed_dags(workflow_id, workflow_type): | ||
"""For a given workflow find dags associated to it. | ||
|
||
:param workflow: workflow to look dags for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Content-Type: | ||
- application/json | ||
method: DELETE | ||
uri: http://airflow-webserver:8080/api/v1/dags/author_create_initialization_dag/dagRuns/00000000-0000-0000-0000-000000000001 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we trying to delete twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes so this by default for every test in the tearDown
of every test we are deleting the execution.
Because for the test_delete_workflow_dag
test we are specifically testing that the deletion is done correctly, the tearDown
wont be able to delete anything in this case
def tearDown(self): | ||
super().tearDown() | ||
airflow_utils.delete_workflow_dag( | ||
WORKFLOW_DAGS[self.workflow.workflow_type].initialize, str(self.workflow.id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's be consistent, in some places we do str
in some others we don't
status="running", | ||
core=True, | ||
is_update=False, | ||
workflow_type="AUTHOR_CREATE", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please let's use everywhere the constant, to be easy to propagate changes everywhere and to avoid changing strings one-by-one
11f79fa
to
93fc158
Compare
#483
User Actions added:
Missing User Action:
/api/workflows/authors/<id>/restart
it might change but for now it receives:also it can simply not receive nothing in that case a full restart occurs