diff --git a/README.md b/README.md index c08973e..3123ee5 100644 --- a/README.md +++ b/README.md @@ -6,13 +6,13 @@ [![License](https://img.shields.io/badge/license-MIT-blue.svg)](LICENSE.md) [![Docs](https://img.shields.io/badge/view-docs-8A2BE2?color=8A2BE2)](https://designsafe-ci.github.io/dapi/dapi/index.html) -`dapi` is a library that simplifies the process of submitting, running, and monitoring [TAPIS v2 / AgavePy](https://agavepy.readthedocs.io/en/latest/index.html) jobs on [DesignSafe](https://designsafe-ci.org) via [Jupyter Notebooks](https://jupyter.designsafe-ci.org). +`dapi` is a library that simplifies the process of submitting, running, and monitoring [TAPIS v3](https://tapis.readthedocs.io/en/latest/) jobs on [DesignSafe](https://designsafe-ci.org) via [Jupyter Notebooks](https://jupyter.designsafe-ci.org). ## Features ### Jobs -* Simplified TAPIS v2 Calls: No need to fiddle with complex API requests. `dapi` abstracts away the complexities. +* Get TAPIS v3 templates for jobs: No need to fiddle with complex API requests. `dapi` abstracts away the complexities. * Seamless Integration with DesignSafe Jupyter Notebooks: Launch DesignSafe applications directly from the Jupyter environment. @@ -53,6 +53,15 @@ pip install git+https://github.com/DesignSafe-CI/dapi.git --quiet ## Example usage: +### Storing credentials + +Dapi uses the Tapis v3 SDK to authenticate with the DesignSafe API. To store your credentials, create a `.env` file in the root of your project with the following content: + +```shell +DESIGNSAFE_USERNAME= +DESIGNSAFE_PASSWORD= +``` + ### Jobs * [Jupyter Notebook Templates](example-notebooks/template-mpm-run.ipynb) using dapi. @@ -66,7 +75,7 @@ Install the latest version of `dapi` and restart the kernel (Kernel >> Restart K ```python # Remove any previous installations !pip uninstall dapi -y -# Install +# Install !pip install dapi --quiet ``` @@ -122,10 +131,6 @@ To run the unit test poetry run pytest -v ``` -## Known Issues - -The project only works on `Python 3.9` due to AgavePy Issue [#125](https://github.com/TACC/agavepy/issues/125). - ## License diff --git a/dapi/__init__.py b/dapi/__init__.py index 7d609d2..1e92dc8 100644 --- a/dapi/__init__.py +++ b/dapi/__init__.py @@ -1,13 +1,34 @@ """ -`dapi` is a library that simplifies the process of submitting, running, and monitoring [TAPIS v2 / AgavePy](https://agavepy.readthedocs.io/en/latest/index.html) jobs on [DesignSafe](https://designsafe-ci.org) via [Jupyter Notebooks](https://jupyter.designsafe-ci.org). - +dapi` is a library that simplifies the process of submitting, running, and monitoring [TAPIS v3](https://tapis.readthedocs.io/en/latest/) jobs on [DesignSafe](https://designsafe-ci.org) via [Jupyter Notebooks](https://jupyter.designsafe-ci.org). ## Features -* Simplified TAPIS v2 Calls: No need to fiddle with complex API requests. `dapi` abstracts away the complexities. +### Jobs + +* Get TAPIS v3 templates for jobs: No need to fiddle with complex API requests. `dapi` abstracts away the complexities. * Seamless Integration with DesignSafe Jupyter Notebooks: Launch DesignSafe applications directly from the Jupyter environment. +### Database + +Connects to SQL databases on DesignSafe: + +| Database | dbname | env_prefix | +|----------|--------|------------| +| NGL | `ngl`| `NGL_` | +| Earthake Recovery | `eq` | `EQ_` | +| Vp | `vp` | `VP_` | + +Define the following environment variables: +``` +{env_prefix}DB_USER +{env_prefix}DB_PASSWORD +{env_prefix}DB_HOST +{env_prefix}DB_PORT +``` + +For e.g., to add the environment variable `NGL_DB_USER` edit `~/.bashrc`, `~/.zshrc`, or a similar shell-specific configuration file for the current user and add `export NGL_DB_USER="dspublic"`. + ## Installation ```shell @@ -15,6 +36,7 @@ ``` """ +from . import apps from . import auth from . import db from . import jobs diff --git a/dapi/apps/__init__.py b/dapi/apps/__init__.py new file mode 100644 index 0000000..e7bb0ef --- /dev/null +++ b/dapi/apps/__init__.py @@ -0,0 +1,3 @@ +from .apps import find_apps, get_app_version + +__all__ = ["find_apps", "get_app_version"] diff --git a/dapi/apps/apps.py b/dapi/apps/apps.py new file mode 100644 index 0000000..c294f7d --- /dev/null +++ b/dapi/apps/apps.py @@ -0,0 +1,57 @@ +from tapipy.tapis import Tapis +from typing import List, Dict, Any, Optional + + +def find_apps( + t: Tapis, search_term: str, list_type: str = "ALL", verbose: bool = True +) -> List[Any]: + """ + Search for Tapis apps matching a search term. + + Args: + t (Tapis): Tapis client instance + search_term (str): Name or partial name to search for + list_type (str): One of 'OWNED', 'SHARED_PUBLIC', 'SHARED_DIRECT', 'READ_PERM', 'MINE', 'ALL' + verbose (bool): If True, prints all found apps + + Returns: + List[Any]: List of matching app objects + """ + results = t.apps.getApps(search=f"(id.like.*{search_term}*)", listType=list_type) + + if verbose: + if not results: + print(f"No apps found matching '{search_term}'") + else: + print(f"\nFound {len(results)} matching apps:") + for app in results: + print(f"- {app.id}") + print() + + return results + + +def get_app_version(t: Tapis, app_id: str, verbose: bool = True) -> Optional[Any]: + """ + Get latest version info for a specific app ID. + + Args: + t (Tapis): Tapis client instance + app_id (str): Exact app ID to look up + verbose (bool): If True, prints basic app info + + Returns: + Optional[Any]: Latest version info for the app, or None if not found + """ + try: + app_info = t.apps.getAppLatestVersion(appId=app_id) + if verbose: + print(f"App: {app_info.id}") + print(f"Version: {app_info.version}") + print(f"System: {app_info.jobAttributes.execSystemId}") + return app_info + except Exception as e: + print(f"Error getting app info for '{app_id}': {str(e)}") + print("\nCouldn't find exact match. Here are similar apps:") + _ = find_apps(t, app_id) + return None diff --git a/dapi/auth/auth.py b/dapi/auth/auth.py index 7cbd19c..eb1d67f 100644 --- a/dapi/auth/auth.py +++ b/dapi/auth/auth.py @@ -1,30 +1,42 @@ -from agavepy.agave import Agave -from collections.abc import Mapping +import os +from getpass import getpass +from tapipy.tapis import Tapis +from dotenv import load_dotenv -def init(username, password): +def init(): """ - Initialize an Agave object with a new client and an active token. + Initialize a Tapis object with authentication. + Tries to read credentials from environment variables first. + If not found, prompts the user for input. - Args: - username (str): The username. - password (str): The password. + Save the user credentials in the .env file. + ``` + DESIGNSAFE_USERNAME= + DESIGNSAFE_PASSWORD= + ``` Returns: - object: The Agave object. + object: The authenticated Tapis object. """ - # Authenticate with Agave - ag = Agave( - base_url="https://agave.designsafe-ci.org", username=username, password=password - ) - # Create a new client - new_client = ag.clients_create() - # create a new ag object with the new client, at this point ag will have a new token - ag = Agave( - base_url="https://agave.designsafe-ci.org", - username=username, - password=password, - api_key=new_client["api_key"], - api_secret=new_client["api_secret"], - ) - return ag + base_url = "https://designsafe.tapis.io" + + # Load environment variables from .env file + load_dotenv() + + # Try to get credentials from environment variables + username = os.getenv("DESIGNSAFE_USERNAME") + password = os.getenv("DESIGNSAFE_PASSWORD") + + # If environment variables are not set, prompt user for input + if not username: + username = input("Enter username: ") + if not password: + password = getpass("Enter password: ") + + # Initialize Tapis object + t = Tapis(base_url=base_url, username=username, password=password) + + t.get_tokens() + + return t diff --git a/dapi/jobs/__init__.py b/dapi/jobs/__init__.py index 065a079..1564ff0 100644 --- a/dapi/jobs/__init__.py +++ b/dapi/jobs/__init__.py @@ -1,14 +1,14 @@ """ -`dapi` job submodule simplifies the process of submitting, running, and monitoring [TAPIS v2 / AgavePy](https://agavepy.readthedocs.io/en/latest/index.html) jobs on [DesignSafe](https://designsafe-ci.org) via [Jupyter Notebooks](https://jupyter.designsafe-ci.org). +`dapi` job submodule simplifies the process of submitting, running, and monitoring [Tapis v3](https://tapis.readthedocs.io/en/latest/) jobs on [DesignSafe](https://designsafe-ci.org) via [Jupyter Notebooks](https://jupyter.designsafe-ci.org). ## Features -* Simplified TAPIS v2 Calls: No need to fiddle with complex API requests. `dapi` abstracts away the complexities. +* Simplified TAPIS v3 Calls: No need to fiddle with complex API requests. `dapi` abstracts away the complexities. * Seamless Integration with DesignSafe Jupyter Notebooks: Launch DesignSafe applications directly from the Jupyter environment. -## Installation +# Installation ```shell pip3 install dapi @@ -16,4 +16,4 @@ """ from .dir import get_ds_path_uri -from .jobs import get_status, runtime_summary, generate_job_info, get_archive_path +from .jobs import get_status, runtime_summary, generate_job_info diff --git a/dapi/jobs/dir.py b/dapi/jobs/dir.py index f2325e8..c0f3627 100644 --- a/dapi/jobs/dir.py +++ b/dapi/jobs/dir.py @@ -1,21 +1,21 @@ import os +from tapipy.tapis import Tapis -def get_ds_path_uri(ag, path): +def get_ds_path_uri(t: Tapis, path: str) -> str: """ - Given a path on DesignSafe, determine the correct input URI. + Given a path on DesignSafe, determine the correct input URI for Tapis v3. Args: - ag (object): Agave object to fetch profiles or metadata. - path (str): The directory path. + t (Tapis): Tapis object to fetch profiles or metadata. + path (str): The directory path. Returns: - str: The corresponding input URI. + str: The corresponding input URI. Raises: - ValueError: If no matching directory pattern is found. + ValueError: If no matching directory pattern is found. """ - # If any of the following directory patterns are found in the path, # process them accordingly. directory_patterns = [ @@ -28,9 +28,9 @@ def get_ds_path_uri(ag, path): for pattern, storage, use_username in directory_patterns: if pattern in path: - path = path.split(pattern).pop() - input_dir = ag.profiles.get()["username"] + path if use_username else path - input_uri = f"agave://{storage}/{input_dir}" + path = path.split(pattern, 1)[1].lstrip("/") + input_dir = f"{t.username}/{path}" if use_username else path + input_uri = f"tapis://{storage}/{input_dir}" return input_uri.replace(" ", "%20") project_patterns = [ @@ -40,12 +40,15 @@ def get_ds_path_uri(ag, path): for pattern, prefix in project_patterns: if pattern in path: - path = path.split(pattern + "/").pop() - project_id = path.split("/")[0] - query = {"value.projectId": str(project_id)} - path = path.split(project_id).pop() - project_uuid = ag.meta.listMetadata(q=str(query))[0]["uuid"] - input_uri = f"agave://{prefix}{project_uuid}{path}" + path = path.split(pattern, 1)[1].lstrip("/") + project_id, *rest = path.split("/", 1) + path = rest[0] if rest else "" + + # Using Tapis v3 to get project UUID + resp = t.get(f"https://designsafe-ci.org/api/projects/v2/{project_id}") + project_uuid = resp.json()["baseProject"]["uuid"] + + input_uri = f"tapis://{prefix}{project_uuid}/{path}" return input_uri.replace(" ", "%20") raise ValueError(f"No matching directory pattern found for: {path}") diff --git a/dapi/jobs/jobs.py b/dapi/jobs/jobs.py index 62f5c7a..3b51c12 100644 --- a/dapi/jobs/jobs.py +++ b/dapi/jobs/jobs.py @@ -2,6 +2,8 @@ from datetime import datetime, timedelta, timezone from tqdm import tqdm import logging +import json +from typing import Dict, Any, Optional # Configuring the logging system # logging.basicConfig( @@ -9,51 +11,112 @@ # ) -def get_status(ag, job_id, time_lapse=15): +def generate_job_info( + t: Any, # Tapis client + app_name: str, + input_uri: str, + input_file: str, + job_name: str = None, + max_minutes: Optional[int] = None, + node_count: Optional[int] = None, + cores_per_node: Optional[int] = None, + queue: Optional[str] = None, + allocation: Optional[str] = None, +) -> Dict[str, Any]: """ - Retrieves and monitors the status of a job from Agave. - - This function initially waits for the job to start, displaying its progress using - a tqdm progress bar. Once the job starts, it monitors the job's status up to - a maximum duration specified by the job's "maxHours". If the job completes or fails - before reaching this maximum duration, it returns the job's final status. + Generates a job info dictionary based on the provided application name, job name, input URI, input file, and optional allocation. Args: - ag (object): The Agave job object used to interact with the job. - job_id (str): The unique identifier of the job to monitor. - time_lapse (int, optional): Time interval, in seconds, to wait between status - checks. Defaults to 15 seconds. + t (object): The Tapis API client object. + app_name (str): The name of the application to use for the job. + input_uri (str): The URI of the input data for the job. + input_file (str): The local file path to the input data for the job. + job_name (str, optional): The name of the job to be created. Defaults to None. + max_minutes (int, optional): The maximum number of minutes the job can run. Defaults to None. + node_count (int, optional): The number of nodes to use for the job. Defaults to None. + cores_per_node (int, optional): The number of cores per node for the job. Defaults to None. + queue (str, optional): The queue to use for the job. Defaults to None. + allocation (str, optional): The allocation to use for the job. Defaults to None. Returns: - str: The final status of the job. Typical values include "FINISHED", "FAILED", - and "STOPPED". + dict: The job info dictionary. + """ + + # Fetch the latest app information + app_info = t.apps.getAppLatestVersion(appId=app_name) + + # If job_name is not provided, use the app name and date + if not job_name: + job_name = f"{app_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + + # Create the base job info + job_info = { + "name": job_name, + "appId": app_info.id, + "appVersion": app_info.version, + "execSystemId": app_info.jobAttributes.execSystemId, + "maxMinutes": max_minutes or app_info.jobAttributes.maxMinutes, + "archiveOnAppError": app_info.jobAttributes.archiveOnAppError, + "fileInputs": [{"name": "Input Directory", "sourceUrl": input_uri}], + "execSystemLogicalQueue": queue + or app_info.jobAttributes.execSystemLogicalQueue, + "nodeCount": node_count or 1, # Default to 1 if not specified + "coresPerNode": cores_per_node or 1, # Default to 1 if not specified + } + + # Handle input file based on app name + if "opensees" in app_name.lower(): + job_info["parameterSet"] = { + "envVariables": [{"key": "tclScript", "value": input_file}] + } + else: + job_info["parameterSet"] = { + "appArgs": [{"name": "Input Script", "arg": input_file}] + } + + # Add TACC allocation if provided + if allocation: + if "schedulerOptions" not in job_info["parameterSet"]: + job_info["parameterSet"]["schedulerOptions"] = [] + job_info["parameterSet"]["schedulerOptions"].append( + {"name": "TACC Allocation", "arg": f"-A {allocation}"} + ) + + return job_info - Raises: - No exceptions are explicitly raised, but potential exceptions raised by the Agave - job object or other called functions/methods will propagate. + +def get_status(t, mjobUuid, tlapse=15): """ + Retrieves and monitors the status of a job using Tapis API. + This function waits for the job to start, then monitors it for up to maxMinutes. + + Args: + t (object): The Tapis API client object. + mjobUuid (str): The unique identifier of the job to monitor. + tlapse (int, optional): Time interval, in seconds, to wait between status checks. Defaults to 15 seconds. + Returns: + str: The final status of the job (FINISHED, FAILED, or STOPPED). + """ previous_status = None # Initially check if the job is already running - status = ag.jobs.getStatus(jobId=job_id)["status"] - - job_details = ag.jobs.get(jobId=job_id) - max_hours = job_details["maxHours"] + status = t.jobs.getJobStatus(jobUuid=mjobUuid).status + max_minutes = t.jobs.getJob(jobUuid=mjobUuid).maxMinutes # Using tqdm to provide visual feedback while waiting for job to start with tqdm(desc="Waiting for job to start", dynamic_ncols=True) as pbar: while status not in ["RUNNING", "FINISHED", "FAILED", "STOPPED"]: - time.sleep(time_lapse) - status = ag.jobs.getStatus(jobId=job_id)["status"] + time.sleep(tlapse) + status = t.jobs.getJobStatus(jobUuid=mjobUuid).status pbar.update(1) pbar.set_postfix_str(f"Status: {status}") - # Once the job is running, monitor it for up to maxHours - max_iterations = int(max_hours * 3600 // time_lapse) + # Once the job is running, monitor it for up to maxMinutes + max_iterations = int(max_minutes * 60 // tlapse) # Using tqdm for progress bar for _ in tqdm(range(max_iterations), desc="Monitoring job", ncols=100): - status = ag.jobs.getStatus(jobId=job_id)["status"] + status = t.jobs.getJobStatus(jobUuid=mjobUuid).status # Print status if it has changed if status != previous_status: @@ -64,149 +127,57 @@ def get_status(ag, job_id, time_lapse=15): if status in ["FINISHED", "FAILED", "STOPPED"]: break - time.sleep(time_lapse) + time.sleep(tlapse) else: # This block will execute if the for loop completes without a 'break' - logging.warn("Warning: Maximum monitoring time reached!") + logging.warning( + f"Warning: Maximum monitoring time of {max_minutes} minutes reached!" + ) return status -def runtime_summary(ag, job_id, verbose=False): +def runtime_summary(t, job_uuid, verbose=False): """Get the runtime of a job. - Args: - ag (object): The Agave object that has the job details. - job_id (str): The ID of the job for which the runtime needs to be determined. - verbose (bool): If True, prints all statuses. Otherwise, prints only specific statuses. - + t (object): The Tapis v3 client object. + job_uuid (str): The UUID of the job for which the runtime needs to be determined. + verbose (bool): If True, prints all history events. Otherwise, prints only specific statuses. Returns: - None: This function doesn't return a value, but it prints the runtime details. - + None: This function doesn't return a value, but it prints the runtime details. """ + from datetime import datetime, timedelta - print("Runtime Summary") + print("\nRuntime Summary") print("---------------") - - job_history = ag.jobs.getHistory(jobId=job_id) - total_time = job_history[-1]["created"] - job_history[0]["created"] - - status_times = {} - - for i in range(len(job_history) - 1): - current_status = job_history[i]["status"] - elapsed_time = job_history[i + 1]["created"] - job_history[i]["created"] - - # Aggregate times for each status - if current_status in status_times: - status_times[current_status] += elapsed_time - else: - status_times[current_status] = elapsed_time - - # Filter the statuses if verbose is False - if not verbose: - filtered_statuses = { - "PENDING", - "QUEUED", - "RUNNING", - "FINISHED", - "FAILED", - } - status_times = { - status: time - for status, time in status_times.items() - if status in filtered_statuses - } - - # Determine the max width of status names for alignment - max_status_width = max(len(status) for status in status_times.keys()) - - # Print the aggregated times for each unique status in a table format - for status, time in status_times.items(): - print(f"{status.upper():<{max_status_width + 2}} time: {time}") - - print(f"{'TOTAL':<{max_status_width + 2}} time: {total_time}") + hist = t.jobs.getJobHistory(jobUuid=job_uuid) + + def format_timedelta(td): + hours, remainder = divmod(td.total_seconds(), 3600) + minutes, seconds = divmod(remainder, 60) + return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}" + + time1 = datetime.strptime(hist[-1].created, "%Y-%m-%dT%H:%M:%S.%fZ") + time0 = datetime.strptime(hist[0].created, "%Y-%m-%dT%H:%M:%S.%fZ") + total_time = time1 - time0 + + if verbose: + print("\nDetailed Job History:") + for event in hist: + print( + f"Event: {event.event}, Detail: {event.eventDetail}, Time: {event.created}" + ) + print("\nSummary:") + + for i in range(len(hist) - 1): + if hist[i].eventDetail == "RUNNING": + time1 = datetime.strptime(hist[i + 1].created, "%Y-%m-%dT%H:%M:%S.%fZ") + time0 = datetime.strptime(hist[i].created, "%Y-%m-%dT%H:%M:%S.%fZ") + print("RUNNING time:", format_timedelta(time1 - time0)) + elif hist[i].eventDetail == "QUEUED": + time1 = datetime.strptime(hist[i + 1].created, "%Y-%m-%dT%H:%M:%S.%fZ") + time0 = datetime.strptime(hist[i].created, "%Y-%m-%dT%H:%M:%S.%fZ") + print("QUEUED time:", format_timedelta(time1 - time0)) + + print("TOTAL time:", format_timedelta(total_time)) print("---------------") - - -def generate_job_info( - ag, - appid: str, - jobname: str = "dsjob", - queue: str = "development", - nnodes: int = 1, - nprocessors: int = 1, - runtime: str = "00:10:00", - inputs=None, - parameters=None, -) -> dict: - """Generate a job information dictionary based on provided arguments. - - Args: - ag (object): The Agave object to interact with the platform. - appid (str): The application ID for the job. - jobname (str, optional): The name of the job. Defaults to 'dsjob'. - queue (str, optional): The batch queue name. Defaults to 'skx-dev'. - nnodes (int, optional): The number of nodes required. Defaults to 1. - nprocessors (int, optional): The number of processors per node. Defaults to 1. - runtime (str, optional): The maximum runtime in the format 'HH:MM:SS'. Defaults to '00:10:00'. - inputs (dict, optional): The inputs for the job. Defaults to None. - parameters (dict, optional): The parameters for the job. Defaults to None. - - Returns: - dict: A dictionary containing the job information. - - Raises: - ValueError: If the provided appid is not valid. - """ - - try: - app = ag.apps.get(appId=appid) - except Exception: - raise ValueError(f"Invalid app ID: {appid}") - - job_info = { - "appId": appid, - "name": jobname, - "batchQueue": queue, - "nodeCount": nnodes, - "processorsPerNode": nprocessors, - "memoryPerNode": "1", - "maxRunTime": runtime, - "archive": True, - "inputs": inputs, - "parameters": parameters, - } - - return job_info - - -def get_archive_path(ag, job_id): - """ - Get the archive path for a given job ID and modifies the user directory - to '/home/jupyter/MyData'. - - Args: - ag (object): The Agave object to interact with the platform. - job_id (str): The job ID to retrieve the archive path for. - - Returns: - str: The modified archive path. - - Raises: - ValueError: If the archivePath format is unexpected. - """ - - # Fetch the job info. - job_info = ag.jobs.get(jobId=job_id) - - # Try to split the archive path to extract the user. - try: - user, _ = job_info.archivePath.split("/", 1) - except ValueError: - raise ValueError(f"Unexpected archivePath format for jobId={job_id}") - - # Construct the new path. - new_path = job_info.archivePath.replace(user, "/home/jupyter/MyData") - - return new_path diff --git a/docs/dapi.html b/docs/dapi.html index 5d5d0e3..1e246f8 100644 --- a/docs/dapi.html +++ b/docs/dapi.html @@ -3,14 +3,14 @@ - + dapi API documentation - +