Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

restart actions: on restart use data from previous run #91

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions backoffice/backoffice/workflows/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,12 @@ def restart_workflow_dags(workflow_id, workflow_type, params=None):
:param params: parameters of new dag execution
:returns: request response
"""

data = fetch_data_workflow_dag(workflow_id, workflow_type)
delete_workflow_dag_runs(workflow_id, workflow_type)

return trigger_airflow_dag(
WORKFLOW_DAGS[workflow_type].initialize, str(workflow_id), params
WORKFLOW_DAGS[workflow_type].initialize, str(workflow_id), params or data
)


Expand All @@ -171,9 +173,19 @@ def delete_workflow_dag_runs(workflow_id, workflow_type):
"""
executed_dags_for_workflow = find_executed_dags(workflow_id, workflow_type)

for dag_id in executed_dags_for_workflow:
for dag_id, _ in executed_dags_for_workflow.items():
delete_workflow_dag(dag_id, str(workflow_id))

return JsonResponse(
data={"message": f"Dag runs for worfklow {workflow_id} have been deleted"}
)

def fetch_data_workflow_dag(workflow_id, workflow_type):
"""Fetches Data that the workflow ran with

:param workflow_id: workflow_id for dag to get data of
:param workflow_type: type of workflow
:returns: data workflow dags used
"""

executed_dags_for_workflow = find_executed_dags(workflow_id, workflow_type)

_, dag = next(iter(executed_dags_for_workflow.items()))
return dag["conf"].get("data")
1 change: 1 addition & 0 deletions backoffice/backoffice/workflows/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ def restart(self, request, pk=None):
workflow.id, workflow.workflow_type
)

Decision.objects.filter(workflow=workflow).delete()
return airflow_utils.restart_workflow_dags(
workflow.id, workflow.workflow_type, request.data.get("params")
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
interactions:
- request:
body: '{"dag_run_id": "00000000-0000-0000-0000-000000000001", "conf": {"workflow_id":
"00000000-0000-0000-0000-000000000001", "data": {"test": "test"}}}'
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
Content-Length:
- '145'
Content-Type:
- application/json
method: POST
uri: http://airflow-webserver:8080/api/v1/dags/author_create_initialization_dag/dagRuns
response:
body:
string: "{\n \"conf\": {\n \"data\": {\n \"test\": \"test\"\n },\n
\ \"workflow_id\": \"00000000-0000-0000-0000-000000000001\"\n },\n \"dag_id\":
\"author_create_initialization_dag\",\n \"dag_run_id\": \"00000000-0000-0000-0000-000000000001\",\n
\ \"data_interval_end\": \"2024-08-30T11:26:33.983555+00:00\",\n \"data_interval_start\":
\"2024-08-30T11:26:33.983555+00:00\",\n \"end_date\": null,\n \"execution_date\":
\"2024-08-30T11:26:33.983555+00:00\",\n \"external_trigger\": true,\n \"last_scheduling_decision\":
null,\n \"logical_date\": \"2024-08-30T11:26:33.983555+00:00\",\n \"note\":
null,\n \"run_type\": \"manual\",\n \"start_date\": null,\n \"state\":
\"queued\"\n}\n"
headers:
Cache-Control:
- no-store
Connection:
- close
Content-Length:
- '621'
Content-Type:
- application/json
Date:
- Fri, 30 Aug 2024 11:26:34 GMT
Server:
- gunicorn
X-Robots-Tag:
- noindex, nofollow
status:
code: 200
message: OK
- request:
body: null
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
method: GET
uri: http://airflow-webserver:8080/api/v1/dags/author_create_initialization_dag/dagRuns/00000000-0000-0000-0000-000000000001
response:
body:
string: "{\n \"conf\": {\n \"data\": {\n \"test\": \"test\"\n },\n
\ \"workflow_id\": \"00000000-0000-0000-0000-000000000001\"\n },\n \"dag_id\":
\"author_create_initialization_dag\",\n \"dag_run_id\": \"00000000-0000-0000-0000-000000000001\",\n
\ \"data_interval_end\": \"2024-08-30T11:26:33.983555+00:00\",\n \"data_interval_start\":
\"2024-08-30T11:26:33.983555+00:00\",\n \"end_date\": \"2024-08-30T11:26:51.968680+00:00\",\n
\ \"execution_date\": \"2024-08-30T11:26:33.983555+00:00\",\n \"external_trigger\":
true,\n \"last_scheduling_decision\": \"2024-08-30T11:26:51.966580+00:00\",\n
\ \"logical_date\": \"2024-08-30T11:26:33.983555+00:00\",\n \"note\": null,\n
\ \"run_type\": \"manual\",\n \"start_date\": \"2024-08-30T11:26:34.088225+00:00\",\n
\ \"state\": \"failed\"\n}\n"
headers:
Cache-Control:
- no-store
Connection:
- close
Content-Length:
- '711'
Content-Type:
- application/json
Date:
- Fri, 30 Aug 2024 11:27:03 GMT
Server:
- gunicorn
X-Robots-Tag:
- noindex, nofollow
status:
code: 200
message: OK
- request:
body: null
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
method: GET
uri: http://airflow-webserver:8080/api/v1/dags/author_create_approved_dag/dagRuns/00000000-0000-0000-0000-000000000001
response:
body:
string: "{\n \"detail\": \"DAGRun with DAG ID: 'author_create_approved_dag'
and DagRun ID: '00000000-0000-0000-0000-000000000001' not found\",\n \"status\":
404,\n \"title\": \"DAGRun not found\",\n \"type\": \"https://airflow.apache.org/docs/apache-airflow/2.9.3/stable-rest-api-ref.html#section/Errors/NotFound\"\n}\n"
headers:
Cache-Control:
- no-store
Connection:
- close
Content-Length:
- '294'
Content-Type:
- application/problem+json
Date:
- Fri, 30 Aug 2024 11:27:03 GMT
Server:
- gunicorn
X-Robots-Tag:
- noindex, nofollow
status:
code: 404
message: NOT FOUND
- request:
body: null
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
method: GET
uri: http://airflow-webserver:8080/api/v1/dags/author_create_rejected_dag/dagRuns/00000000-0000-0000-0000-000000000001
response:
body:
string: "{\n \"detail\": \"DAGRun with DAG ID: 'author_create_rejected_dag'
and DagRun ID: '00000000-0000-0000-0000-000000000001' not found\",\n \"status\":
404,\n \"title\": \"DAGRun not found\",\n \"type\": \"https://airflow.apache.org/docs/apache-airflow/2.9.3/stable-rest-api-ref.html#section/Errors/NotFound\"\n}\n"
headers:
Cache-Control:
- no-store
Connection:
- close
Content-Length:
- '294'
Content-Type:
- application/problem+json
Date:
- Fri, 30 Aug 2024 11:27:03 GMT
Server:
- gunicorn
X-Robots-Tag:
- noindex, nofollow
status:
code: 404
message: NOT FOUND
- request:
body: null
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
Content-Length:
- '0'
method: DELETE
uri: http://airflow-webserver:8080/api/v1/dags/author_create_initialization_dag/dagRuns/00000000-0000-0000-0000-000000000001
response:
body:
string: ''
headers:
Cache-Control:
- no-store
Connection:
- close
Content-Type:
- application/json
Date:
- Fri, 30 Aug 2024 11:27:45 GMT
Server:
- gunicorn
X-Robots-Tag:
- noindex, nofollow
status:
code: 204
message: NO CONTENT
version: 1
Loading
Loading