Skip to content

Commit

Permalink
Merge pull request #29 from Sage-Bionetworks-Workflows/bwmac/IBCDPE-6…
Browse files Browse the repository at this point in the history
…91/workflow-tasks-logs

[IBCDPE-691] Adds Ability to Retrieve Workflow Task Logs from Tower
  • Loading branch information
BWMac authored Oct 26, 2023
2 parents 7f63c0e + 552e08c commit ea1a993
Show file tree
Hide file tree
Showing 7 changed files with 3,311 additions and 2,231 deletions.
5,137 changes: 2,909 additions & 2,228 deletions Pipfile.lock

Large diffs are not rendered by default.

48 changes: 46 additions & 2 deletions src/orca/services/nextflowtower/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def request_paged(self, method: str, path: str, **kwargs) -> dict[str, Any]:
while num_items < total_size:
kwargs["params"]["offset"] = num_items
json = self.request_json(method, path, **kwargs)
total_size = json.pop("totalSize")
total_size = json.pop("totalSize", None) or json.pop("total", 0)
key_name, items = json.popitem()
num_items += len(items)
all_items.extend(items)
Expand All @@ -134,7 +134,7 @@ def get(self, path: str, **kwargs) -> dict[str, Any]:
A dictionary from deserializing the JSON response.
"""
json = self.request_json("GET", path, **kwargs)
if "totalSize" in json:
if "totalSize" in json or "total" in json:
json = self.request_paged("GET", path, **kwargs)
return json

Expand Down Expand Up @@ -369,3 +369,47 @@ def list_workflows(
json = self.get(path=path, params=params)
items = self.unwrap(json, "workflows")
return [models.Workflow.from_json(item["workflow"]) for item in items]

def get_workflow_tasks(
self,
workflow_id: str,
workspace_id: Optional[int] = None,
) -> list[models.WorkflowTask]:
"""Retrieve the details of a workflow run's tasks.
Args:
workflow_id: The ID number for a workflow run to
get tasks from.
workspace_id: The ID number of the workspace the workflow
exists within. Defaults to None.
Returns:
List of WorkflowTask objects.
"""
path = f"/workflow/{workflow_id}/tasks"
params = self.generate_params(workspace_id)
json = self.get(path=path, params=params)
items = self.unwrap(json, "tasks")
return [models.WorkflowTask.from_json(item["task"]) for item in items]

def get_task_logs(
self, workflow_id: str, task_id: int, workspace_id: Optional[int]
) -> str:
"""Retrieve the logs for a given workflow task.
Args:
workflow_id: The ID number for a workflow run the
tasks belongs to.
task_id: The task_id for the task to get logs from.
workspace_id: The ID number of the workspace the workflow
exists within. Defaults to None.
Returns:
WorkflowTask Execution logs.
"""
path = f"/workflow/{workflow_id}/log/{task_id}"
params = self.generate_params(workspace_id)
json = self.get(path=path, params=params)
items = self.unwrap(json, "log")
log_list = items["entries"]
return "\n".join(log_list)
42 changes: 42 additions & 0 deletions src/orca/services/nextflowtower/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,3 +336,45 @@ def __repr__(self) -> str:
def status(self) -> WorkflowStatus:
"""Workflow run status."""
return WorkflowStatus(self.state)


@dataclass(kw_only=False)
class WorkflowTask(BaseTowerModel):
"""Nextflow Tower workflow task details."""

id: int
task_id: int
status: str
name: str
module: list[str]
queue: str
memory: Optional[int]
script: str
tag: Optional[str]
executor: str
duration: int
container: str
process: str
attempt: int
scratch: Optional[str]
work_dir: str
disk: Optional[int]
price_model: str
cost: float
error_action: Optional[str]
native_id: str
env: Optional[str]
exit_status: int
cpus: Optional[int]
machine_type: str
hash: str

_key_mapping = {
"task_id": "taskId",
"work_dir": "workdir",
"price_model": "priceModel",
"error_action": "errorAction",
"native_id": "nativeId",
"exit_status": "exitStatus",
"machine_type": "machineType",
}
30 changes: 29 additions & 1 deletion src/orca/services/nextflowtower/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@
from orca.services.nextflowtower.client import NextflowTowerClient
from orca.services.nextflowtower.client_factory import NextflowTowerClientFactory
from orca.services.nextflowtower.config import NextflowTowerConfig
from orca.services.nextflowtower.models import LaunchInfo, Workflow, WorkflowStatus
from orca.services.nextflowtower.models import (
LaunchInfo,
Workflow,
WorkflowStatus,
WorkflowTask,
)
from orca.services.nextflowtower.utils import increment_suffix

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -272,3 +277,26 @@ async def monitor_workflow(

logger.info(f"{workflow} is now done!")
return workflow.status

def get_workflow_tasks(self, workflow_id: str) -> list[WorkflowTask]:
"""Retrieve the details of a workflow run's tasks.
Args:
workflow_id: Workflow run ID.
Returns:
List of task details.
"""
return self.client.get_workflow_tasks(workflow_id, self.workspace_id)

def get_task_logs(self, workflow_id: str, task_id: int) -> str:
"""Retrieve the execution logs for a given workflow task.
Args:
workflow_id: Workflow run ID.
task_id: Task ID.
Returns:
Task logs.
"""
return self.client.get_task_logs(workflow_id, task_id, self.workspace_id)
Loading

0 comments on commit ea1a993

Please sign in to comment.