diff --git a/backoffice/backoffice/workflows/airflow_utils.py b/backoffice/backoffice/workflows/airflow_utils.py index 2975f880..5806a5e5 100644 --- a/backoffice/backoffice/workflows/airflow_utils.py +++ b/backoffice/backoffice/workflows/airflow_utils.py @@ -55,13 +55,15 @@ def restart_failed_tasks(workflow_id, workflow_type): :param workflow_type: type of workflow to retrieve :returns: request response """ + dag_id = find_failed_dag(str(workflow_id), workflow_type) + if dag_id is None: + return JsonResponse({"message": "There are no failing tasks, skipping restart"}) - dag_id = find_failed_dag(workflow_id, workflow_type) # assumes current task is one of the failed tasks data = { "dry_run": False, "dag_run_id": str(workflow_id), - "reset_dag_runs": False, + "reset_dag_runs": True, "only_failed": True, } diff --git a/backoffice/backoffice/workflows/api/views.py b/backoffice/backoffice/workflows/api/views.py index 85125b29..87376943 100644 --- a/backoffice/backoffice/workflows/api/views.py +++ b/backoffice/backoffice/workflows/api/views.py @@ -116,11 +116,15 @@ def create(self, request): data=serializer.validated_data["data"], workflow_type=serializer.validated_data["workflow_type"], ) + workflow["data"][ + "$schema":"https://inspirehep.net/schemas/records/authors.json" + ] logger.info( "Trigger Airflow DAG: %s for %s", WORKFLOW_DAGS[workflow.workflow_type].initialize, workflow.id, ) + return airflow_utils.trigger_airflow_dag( WORKFLOW_DAGS[workflow.workflow_type].initialize, str(workflow.id), @@ -194,7 +198,9 @@ def restart(self, request, pk=None): workflow = Workflow.objects.get(id=pk) if request.data.get("restart_current_task"): - return airflow_utils.restart_failed_tasks(workflow) + return airflow_utils.restart_failed_tasks( + workflow.id, workflow.workflow_type + ) return airflow_utils.restart_workflow_dags( workflow.id, workflow.workflow_type, request.data.get("params") diff --git a/backoffice/backoffice/workflows/tests/cassettes/TestAirflowUtils.test_restart_failed_tasks_no_tasks.yaml b/backoffice/backoffice/workflows/tests/cassettes/TestAirflowUtils.test_restart_failed_tasks_no_tasks.yaml new file mode 100644 index 00000000..5f767dd5 --- /dev/null +++ b/backoffice/backoffice/workflows/tests/cassettes/TestAirflowUtils.test_restart_failed_tasks_no_tasks.yaml @@ -0,0 +1,184 @@ +interactions: +- request: + body: '{"dag_run_id": "00000000-0000-0000-0000-000000000001", "conf": {"workflow_id": + "00000000-0000-0000-0000-000000000001"}}' + headers: + Accept: + - '*/*' + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Length: + - '119' + Content-Type: + - application/json + method: POST + uri: http://airflow-webserver:8080/api/v1/dags/author_create_initialization_dag/dagRuns + response: + body: + string: "{\n \"conf\": {\n \"workflow_id\": \"00000000-0000-0000-0000-000000000001\"\n + \ },\n \"dag_id\": \"author_create_initialization_dag\",\n \"dag_run_id\": + \"00000000-0000-0000-0000-000000000001\",\n \"data_interval_end\": \"2024-08-08T14:28:42.451958+00:00\",\n + \ \"data_interval_start\": \"2024-08-08T14:28:42.451958+00:00\",\n \"end_date\": + null,\n \"execution_date\": \"2024-08-08T14:28:42.451958+00:00\",\n \"external_trigger\": + true,\n \"last_scheduling_decision\": null,\n \"logical_date\": \"2024-08-08T14:28:42.451958+00:00\",\n + \ \"note\": null,\n \"run_type\": \"manual\",\n \"start_date\": null,\n + \ \"state\": \"queued\"\n}\n" + headers: + Connection: + - close + Content-Length: + - '579' + Content-Type: + - application/json + Date: + - Thu, 08 Aug 2024 14:28:42 GMT + Server: + - gunicorn + X-Robots-Tag: + - noindex, nofollow + status: + code: 200 + message: OK +- request: + body: null + headers: + Accept: + - '*/*' + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Type: + - application/json + method: GET + uri: http://airflow-webserver:8080/api/v1/dags/author_create_initialization_dag/dagRuns/00000000-0000-0000-0000-000000000001 + response: + body: + string: "{\n \"conf\": {\n \"workflow_id\": \"00000000-0000-0000-0000-000000000001\"\n + \ },\n \"dag_id\": \"author_create_initialization_dag\",\n \"dag_run_id\": + \"00000000-0000-0000-0000-000000000001\",\n \"data_interval_end\": \"2024-08-08T14:28:42.451958+00:00\",\n + \ \"data_interval_start\": \"2024-08-08T14:28:42.451958+00:00\",\n \"end_date\": + \"2024-08-08T14:28:48.317690+00:00\",\n \"execution_date\": \"2024-08-08T14:28:42.451958+00:00\",\n + \ \"external_trigger\": true,\n \"last_scheduling_decision\": \"2024-08-08T14:28:48.316860+00:00\",\n + \ \"logical_date\": \"2024-08-08T14:28:42.451958+00:00\",\n \"note\": null,\n + \ \"run_type\": \"manual\",\n \"start_date\": \"2024-08-08T14:28:42.816917+00:00\",\n + \ \"state\": \"success\"\n}\n" + headers: + Connection: + - close + Content-Length: + - '670' + Content-Type: + - application/json + Date: + - Thu, 08 Aug 2024 14:29:42 GMT + Server: + - gunicorn + X-Robots-Tag: + - noindex, nofollow + status: + code: 200 + message: OK +- request: + body: null + headers: + Accept: + - '*/*' + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Type: + - application/json + method: GET + uri: http://airflow-webserver:8080/api/v1/dags/author_create_approved_dag/dagRuns/00000000-0000-0000-0000-000000000001 + response: + body: + string: "{\n \"detail\": \"DAGRun with DAG ID: 'author_create_approved_dag' + and DagRun ID: '00000000-0000-0000-0000-000000000001' not found\",\n \"status\": + 404,\n \"title\": \"DAGRun not found\",\n \"type\": \"https://airflow.apache.org/docs/apache-airflow/2.8.3/stable-rest-api-ref.html#section/Errors/NotFound\"\n}\n" + headers: + Connection: + - close + Content-Length: + - '294' + Content-Type: + - application/problem+json + Date: + - Thu, 08 Aug 2024 14:29:42 GMT + Server: + - gunicorn + X-Robots-Tag: + - noindex, nofollow + status: + code: 404 + message: NOT FOUND +- request: + body: null + headers: + Accept: + - '*/*' + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Type: + - application/json + method: GET + uri: http://airflow-webserver:8080/api/v1/dags/author_create_rejected_dag/dagRuns/00000000-0000-0000-0000-000000000001 + response: + body: + string: "{\n \"detail\": \"DAGRun with DAG ID: 'author_create_rejected_dag' + and DagRun ID: '00000000-0000-0000-0000-000000000001' not found\",\n \"status\": + 404,\n \"title\": \"DAGRun not found\",\n \"type\": \"https://airflow.apache.org/docs/apache-airflow/2.8.3/stable-rest-api-ref.html#section/Errors/NotFound\"\n}\n" + headers: + Connection: + - close + Content-Length: + - '294' + Content-Type: + - application/problem+json + Date: + - Thu, 08 Aug 2024 14:29:42 GMT + Server: + - gunicorn + X-Robots-Tag: + - noindex, nofollow + status: + code: 404 + message: NOT FOUND +- request: + body: null + headers: + Accept: + - '*/*' + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Length: + - '0' + Content-Type: + - application/json + method: DELETE + uri: http://airflow-webserver:8080/api/v1/dags/author_create_initialization_dag/dagRuns/00000000-0000-0000-0000-000000000001 + response: + body: + string: '' + headers: + Connection: + - close + Content-Type: + - application/json + Date: + - Thu, 08 Aug 2024 14:29:42 GMT + Server: + - gunicorn + X-Robots-Tag: + - noindex, nofollow + status: + code: 204 + message: NO CONTENT +version: 1 diff --git a/backoffice/backoffice/workflows/tests/test_airflow_utils.py b/backoffice/backoffice/workflows/tests/test_airflow_utils.py index 2949e15d..2d508c0b 100644 --- a/backoffice/backoffice/workflows/tests/test_airflow_utils.py +++ b/backoffice/backoffice/workflows/tests/test_airflow_utils.py @@ -33,6 +33,13 @@ def test_restart_failed_tasks(self): ) self.assertEqual(response.status_code, 200) + @pytest.mark.vcr() + def test_restart_failed_tasks_no_tasks(self): + response = airflow_utils.restart_failed_tasks( + self.workflow_id, self.workflow_type + ) + self.assertEqual(response.status_code, 200) + @pytest.mark.vcr() def test_find_executed_dags(self): executed_dags_for_workflow = airflow_utils.find_executed_dags( diff --git a/workflows/dags/author/author_create/author_create_approved.py b/workflows/dags/author/author_create/author_create_approved.py index 90dfc32c..12ff699b 100644 --- a/workflows/dags/author/author_create/author_create_approved.py +++ b/workflows/dags/author/author_create/author_create_approved.py @@ -106,6 +106,7 @@ def create_author_on_inspire(**context: dict) -> str: workflow_management_hook.partial_update_workflow( workflow_id=context["params"]["workflow_id"], workflow_partial_update_data={"data": workflow_data["data"]}, + collection=AUTHORS, ) return status diff --git a/workflows/dags/author/author_create/author_create_init.py b/workflows/dags/author/author_create/author_create_init.py index 9fbfb207..43066a23 100644 --- a/workflows/dags/author/author_create/author_create_init.py +++ b/workflows/dags/author/author_create/author_create_init.py @@ -48,15 +48,6 @@ def set_workflow_status_to_running(**context): collection=AUTHORS, ) - @task() - def set_schema(**context): - schema = "https://inspirehep.net/schemas/records/authors.json" - workflow_management_hook.partial_update_workflow( - workflow_id=context["params"]["workflow_id"], - workflow_partial_update_data={"data": {"$schema": schema}}, - collection=AUTHORS, - ) - @task() def create_author_create_user_ticket(**context: dict) -> None: endpoint = "/api/tickets/create" @@ -91,7 +82,6 @@ def set_author_create_workflow_status_to_approval(**context: dict) -> None: # task dependencies ( set_workflow_status_to_running() - >> set_schema() >> create_author_create_user_ticket() >> set_author_create_workflow_status_to_approval() )