Skip to content

Commit

Permalink
workflows: fixed accesses to authors api
Browse files Browse the repository at this point in the history
  • Loading branch information
DonHaul committed Nov 18, 2024
1 parent 681c501 commit f2a486c
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 26 deletions.
12 changes: 4 additions & 8 deletions workflows/dags/author/author_create/author_create_approved.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,15 @@ def author_create_approved_dag():
"""
inspire_http_hook = InspireHttpHook()
inspire_http_record_management_hook = InspireHTTPRecordManagementHook()
workflow_management_hook = WorkflowManagementHook()
workflow_management_hook = WorkflowManagementHook(AUTHORS)
workflow_ticket_management_hook = AuthorWorkflowTicketManagementHook()

@task()
def set_workflow_status_to_running(**context):
status_name = "running"
workflow_management_hook.set_workflow_status(
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
workflow_id=context["params"]["workflow_id"]
)

@task.branch()
Expand Down Expand Up @@ -106,8 +105,7 @@ def create_author_on_inspire(**context: dict) -> str:
workflow_data["data"]["control_number"] = control_number
workflow_management_hook.partial_update_workflow(
workflow_id=context["params"]["workflow_id"],
workflow_partial_update_data={"data": workflow_data["data"]},
collection=AUTHORS,
workflow_partial_update_data={"data": workflow_data["data"]}
)
return status

Expand All @@ -125,8 +123,7 @@ def set_author_create_workflow_status_to_completed(**context: dict) -> None:
status_name = "completed"
workflow_management_hook.set_workflow_status(
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
workflow_id=context["params"]["workflow_id"]
)

@task
Expand All @@ -142,7 +139,6 @@ def set_author_create_workflow_status_to_error(**context: dict) -> None:
workflow_management_hook.set_workflow_status(
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
)

# task definitions
Expand Down
8 changes: 3 additions & 5 deletions workflows/dags/author/author_create/author_create_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,15 @@ def author_create_initialization_dag():
"""
inspire_http_hook = InspireHttpHook()
workflow_management_hook = WorkflowManagementHook()
workflow_management_hook = WorkflowManagementHook(AUTHORS)
workflow_ticket_management_hook = AuthorWorkflowTicketManagementHook()

@task()
def set_workflow_status_to_running(**context):
status_name = "running"
workflow_management_hook.set_workflow_status(
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
workflow_id=context["params"]["workflow_id"]
)

@task()
Expand All @@ -56,8 +55,7 @@ def set_schema(**context):
workflow_id=context["params"]["workflow_id"],
workflow_partial_update_data={
"data": {**context["params"]["data"], "$schema": schema}
},
collection=AUTHORS,
}
)

@task()
Expand Down
8 changes: 3 additions & 5 deletions workflows/dags/author/author_create/author_create_rejected.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,22 @@ def author_create_rejected_dag() -> None:
2. set_author_create_workflow_status_to_completed: Sets the status of
the author creation workflow to 'completed'.
"""
workflow_management_hook = WorkflowManagementHook()
workflow_management_hook = WorkflowManagementHook(AUTHORS)

@task()
def set_author_create_workflow_status_to_completed(**context: dict) -> None:
status_name = "completed"
workflow_management_hook.set_workflow_status(
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
workflow_id=context["params"]["workflow_id"]
)

@task()
def set_workflow_status_to_running(**context):
status_name = "running"
workflow_management_hook.set_workflow_status(
status_name=status_name,
workflow_id=context["params"]["workflow_id"],
collection=AUTHORS,
workflow_id=context["params"]["workflow_id"]
)

# task definitions
Expand Down
2 changes: 1 addition & 1 deletion workflows/dags/author/author_update/author_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def author_update_dag():
"""
inspire_http_hook = InspireHttpHook()
inspire_http_record_management_hook = InspireHTTPRecordManagementHook()
workflow_management_hook = WorkflowManagementHook()
workflow_management_hook = WorkflowManagementHook(AUTHORS)
workflow_ticket_management_hook = AuthorWorkflowTicketManagementHook()

@task()
Expand Down
17 changes: 10 additions & 7 deletions workflows/plugins/hooks/backoffice/workflow_management_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ class WorkflowManagementHook(BackofficeHook):
:type http_conn_id: str
"""

def __init__(self,collection):
super().__init__()
self.collection = collection

def set_workflow_status(
self, status_name: str, workflow_id: str, collection: str
self, status_name: str, workflow_id: str
) -> Response:
"""
Updates the status of a workflow in the backoffice system.
Expand All @@ -33,20 +37,19 @@ def set_workflow_status(
}
return self.partial_update_workflow(
workflow_partial_update_data=request_data,
workflow_id=workflow_id,
collection=collection,
workflow_id=workflow_id
)

def get_workflow(self, workflow_id: str) -> dict:
endpoint = f"api/workflows/{workflow_id}"
endpoint = f"api/workflows/{self.collection}/{workflow_id}"
response = self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs, method="GET", endpoint=endpoint
)
response = self.run(endpoint=endpoint, headers=self.headers)
return response.json()

def update_workflow(self, workflow_id: str, workflow_data: dict) -> Response:
endpoint = f"api/workflows/{workflow_id}/"
endpoint = f"api/workflows/{self.collection}/{workflow_id}/"
return self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs,
method="PUT",
Expand All @@ -55,9 +58,9 @@ def update_workflow(self, workflow_id: str, workflow_data: dict) -> Response:
)

def partial_update_workflow(
self, workflow_id: str, workflow_partial_update_data: dict, collection: str
self, workflow_id: str, workflow_partial_update_data: dict
) -> Response:
endpoint = f"api/workflows/{collection}/{workflow_id}/"
endpoint = f"api/workflows/{self.collection}/{workflow_id}/"
return self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs,
method="PATCH",
Expand Down

0 comments on commit f2a486c

Please sign in to comment.