Skip to content

Commit

Permalink
Test runtime summary
Browse files Browse the repository at this point in the history
  • Loading branch information
kks32 committed Sep 30, 2024
1 parent 1834ed2 commit 77f585f
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 80 deletions.
2 changes: 1 addition & 1 deletion dapi/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@
"""
from .dir import get_ds_path_uri
from .jobs import get_status, runtime_summary, generate_job_info, get_archive_path
from .jobs import get_status, runtime_summary, generate_job_info
116 changes: 37 additions & 79 deletions dapi/jobs/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,89 +129,47 @@ def get_status(t, mjobUuid, tlapse=15):
return status


def runtime_summary(ag, job_id, verbose=False):
def runtime_summary(t, job_uuid, verbose=False):
"""Get the runtime of a job.
Args:
ag (object): The Agave object that has the job details.
job_id (str): The ID of the job for which the runtime needs to be determined.
verbose (bool): If True, prints all statuses. Otherwise, prints only specific statuses.
t (object): The Tapis v3 client object.
job_uuid (str): The UUID of the job for which the runtime needs to be determined.
verbose (bool): If True, prints all history events. Otherwise, prints only specific statuses.
Returns:
None: This function doesn't return a value, but it prints the runtime details.
None: This function doesn't return a value, but it prints the runtime details.
"""
from datetime import datetime, timedelta

print("Runtime Summary")
print("\nRuntime Summary")
print("---------------")

job_history = ag.jobs.getHistory(jobId=job_id)
total_time = job_history[-1]["created"] - job_history[0]["created"]

status_times = {}

for i in range(len(job_history) - 1):
current_status = job_history[i]["status"]
elapsed_time = job_history[i + 1]["created"] - job_history[i]["created"]

# Aggregate times for each status
if current_status in status_times:
status_times[current_status] += elapsed_time
else:
status_times[current_status] = elapsed_time

# Filter the statuses if verbose is False
if not verbose:
filtered_statuses = {
"PENDING",
"QUEUED",
"RUNNING",
"FINISHED",
"FAILED",
}
status_times = {
status: time
for status, time in status_times.items()
if status in filtered_statuses
}

# Determine the max width of status names for alignment
max_status_width = max(len(status) for status in status_times.keys())

# Print the aggregated times for each unique status in a table format
for status, time in status_times.items():
print(f"{status.upper():<{max_status_width + 2}} time: {time}")

print(f"{'TOTAL':<{max_status_width + 2}} time: {total_time}")
hist = t.jobs.getJobHistory(jobUuid=job_uuid)

def format_timedelta(td):
hours, remainder = divmod(td.total_seconds(), 3600)
minutes, seconds = divmod(remainder, 60)
return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}"

time1 = datetime.strptime(hist[-1].created, "%Y-%m-%dT%H:%M:%S.%fZ")
time0 = datetime.strptime(hist[0].created, "%Y-%m-%dT%H:%M:%S.%fZ")
total_time = time1 - time0

if verbose:
print("\nDetailed Job History:")
for event in hist:
print(
f"Event: {event.event}, Detail: {event.eventDetail}, Time: {event.created}"
)
print("\nSummary:")

for i in range(len(hist) - 1):
if hist[i].eventDetail == "RUNNING":
time1 = datetime.strptime(hist[i + 1].created, "%Y-%m-%dT%H:%M:%S.%fZ")
time0 = datetime.strptime(hist[i].created, "%Y-%m-%dT%H:%M:%S.%fZ")
print("RUNNING time:", format_timedelta(time1 - time0))
elif hist[i].eventDetail == "QUEUED":
time1 = datetime.strptime(hist[i + 1].created, "%Y-%m-%dT%H:%M:%S.%fZ")
time0 = datetime.strptime(hist[i].created, "%Y-%m-%dT%H:%M:%S.%fZ")
print("QUEUED time:", format_timedelta(time1 - time0))

print("TOTAL time:", format_timedelta(total_time))
print("---------------")


def get_archive_path(ag, job_id):
"""
Get the archive path for a given job ID and modifies the user directory
to '/home/jupyter/MyData'.
Args:
ag (object): The Agave object to interact with the platform.
job_id (str): The job ID to retrieve the archive path for.
Returns:
str: The modified archive path.
Raises:
ValueError: If the archivePath format is unexpected.
"""

# Fetch the job info.
job_info = ag.jobs.get(jobId=job_id)

# Try to split the archive path to extract the user.
try:
user, _ = job_info.archivePath.split("/", 1)
except ValueError:
raise ValueError(f"Unexpected archivePath format for jobId={job_id}")

# Construct the new path.
new_path = job_info.archivePath.replace(user, "/home/jupyter/MyData")

return new_path
147 changes: 147 additions & 0 deletions tests/jobs/test_runtime_summary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import unittest
from unittest.mock import Mock
from io import StringIO
import sys
from datetime import datetime, timedelta
import re

import dapi as ds


class TestRuntimeSummary(unittest.TestCase):
def setUp(self):
super().setUp()
self.t_mock = Mock()
start_time = datetime(2024, 9, 30, 14, 0, 0) # Use a fixed start time
self.job_history = [
Mock(
event="JOB_NEW_STATUS",
eventDetail="PENDING",
created=start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
),
Mock(
event="JOB_NEW_STATUS",
eventDetail="PROCESSING_INPUTS",
created=(start_time + timedelta(seconds=3)).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
),
),
Mock(
event="JOB_NEW_STATUS",
eventDetail="STAGING_INPUTS",
created=(start_time + timedelta(seconds=7)).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
),
),
Mock(
event="JOB_NEW_STATUS",
eventDetail="STAGED",
created=(start_time + timedelta(seconds=11)).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
),
),
Mock(
event="JOB_NEW_STATUS",
eventDetail="STAGING_JOB",
created=(start_time + timedelta(seconds=18)).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
),
),
Mock(
event="JOB_NEW_STATUS",
eventDetail="SUBMITTING",
created=(start_time + timedelta(seconds=30)).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
),
),
Mock(
event="JOB_NEW_STATUS",
eventDetail="QUEUED",
created=(start_time + timedelta(seconds=48)).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
),
),
Mock(
event="JOB_NEW_STATUS",
eventDetail="RUNNING",
created=(start_time + timedelta(minutes=1, seconds=12)).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
),
),
Mock(
event="JOB_NEW_STATUS",
eventDetail="CLEANING_UP",
created=(start_time + timedelta(minutes=2, seconds=36)).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
),
),
Mock(
event="JOB_NEW_STATUS",
eventDetail="ARCHIVING",
created=(start_time + timedelta(minutes=2, seconds=36)).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
),
),
Mock(
event="JOB_NEW_STATUS",
eventDetail="FINISHED",
created=(start_time + timedelta(minutes=2, seconds=48)).strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
),
),
]

def capture_output(self, t_mock, job_id, verbose):
saved_stdout = sys.stdout
try:
out = StringIO()
sys.stdout = out
ds.jobs.runtime_summary(t_mock, job_id, verbose)
return out.getvalue().strip()
finally:
sys.stdout = saved_stdout

def test_runtime_summary_verbose_true(self):
self.t_mock.jobs.getJobHistory.return_value = self.job_history
output = self.capture_output(self.t_mock, "mock_id", True)

# Check the structure of the output without relying on exact timestamps
self.assertIn("Runtime Summary", output)
self.assertIn("---------------", output)
self.assertIn("Detailed Job History:", output)

# Check that all event details are present
for event in self.job_history:
self.assertIn(f"Event: {event.event}, Detail: {event.eventDetail}", output)

# Check the summary section
self.assertIn("Summary:", output)
self.assertRegex(output, r"QUEUED\s+time: 00:00:24")
self.assertRegex(output, r"RUNNING\s+time: 00:01:24")
self.assertRegex(output, r"TOTAL\s+time: 00:02:48")

def test_runtime_summary_verbose_false(self):
self.t_mock.jobs.getJobHistory.return_value = self.job_history
output = self.capture_output(self.t_mock, "mock_id", False)

# Check the overall structure
self.assertIn("Runtime Summary", output)
self.assertIn("---------------", output)

# Check for the presence of each status and its time
self.assertRegex(output, r"QUEUED\s+time:\s+00:00:24")
self.assertRegex(output, r"RUNNING\s+time:\s+00:01:24")
self.assertRegex(output, r"TOTAL\s+time:\s+00:02:48")

# Check the order of the statuses
status_order = ["QUEUED", "RUNNING", "TOTAL"]
status_indices = [output.index(status) for status in status_order]
self.assertEqual(
status_indices,
sorted(status_indices),
"Statuses are not in the expected order",
)


if __name__ == "__main__":
unittest.main()

0 comments on commit 77f585f

Please sign in to comment.