diff --git a/covalent/_results_manager/results_manager.py b/covalent/_results_manager/results_manager.py index 4c751206a..5ed5eee61 100644 --- a/covalent/_results_manager/results_manager.py +++ b/covalent/_results_manager/results_manager.py @@ -19,6 +19,7 @@ import contextlib import os +from datetime import datetime from pathlib import Path from typing import Dict, List, Optional @@ -516,3 +517,111 @@ def get_result( app_log.error(re) num_attempts += 1 raise RuntimeError("Timed out waiting for result. Please retry or check dispatch.") + +def get_all_results(dispatch_id_lst: list[str] = [], + wait: bool = False, + dispatcher_addr: str = None, + status_only: bool = False, + *, + results_dir: Optional[str] = None, + workflow_output: bool = True, + intermediate_outputs: bool = True, + sublattice_results: bool = True, + qelectron_db: bool = False, + completed_check : Optional[bool] = False, + started_before : Optional[datetime] = None, + started_after : Optional[datetime] = None, + completed_before : Optional[datetime] = None, + completed_after: Optional[datetime] = None +) -> list(Result): + """ + Get all the results from a list of dispatch ids. + + Args: + dispatch_id_lst: A list of dispatch ids. + wait: Controls how long the method waits for the server to return a + result. If False, the method will not wait and will return the + current status of the workflow. If True, the method will wait for + the result to finish and keep retrying for sys.maxsize. + dispatcher_addr: Dispatcher server address. Defaults to the address set + in Covalent's config. + status_only: If true, only returns result status, not the full result + object. Default is False. + + Kwargs: + results_dir: The directory where the results are stored in dispatch + id named folders. + workflow_output: Whether to return the workflow output. + Defaults to True. + intermediate_outputs: Whether to return all intermediate outputs in the + compute graph. Defaults to True. + sublattice_results: Whether to recursively retrieve sublattice results. + Default is True. + qelectron_db: Whether to load the bytes data of qelectron_db. Default + is False. + completed_check : Whether to only include completed results. + started_before: Only return results that started before this time. + started_after: Only return results that started after this time. + completed_before: Only return results that completed before this time. + If the result as not been completed, it will not appear in the list. + completed_after: Only return results that completed after this time. + If the result as not been completed, it will not appear in the list. + + Returns: + A list of Result objects from the Covalent server that meet the query. + """ + + # This implementation depends on the results being stored in the same + # machine. May need a better implemenation to fix. As a work around, the + # user can specify the dispatch_id_lst, otherwise it will default to the + # machine specific implementation. + if len(dispatch_id_lst) == 0: + try: + dispatch_dir = get_config()[dispatcher_addr][results_dir] + dispatch_id_lst = os.listdir(dispatch_dir) + except: + raise ValueError("No dispatch ids specified and no default directory found.") + + # Initialize the list of results + result_lst = [] + + for _id in dispatch_id_lst: + try: + # Depends on get_result method, works as normal + result = get_result(_id, wait, dispatcher_addr, status_only, + results_dir, workflow_output, + intermediate_outputs, sublattice_results, + qelectron_db) + + # Does not follow PEP8 for better readability of conditional logic. + # The logic could be simplified. This is it currently as follows: + # If the result does not meet the query, then exclude it from the + # list. started_before and started_after are datetime objects that + # determine the time range for the start time. completed_before and + # completed_after are datetime objects that determine the time range + # for the end time. If the result does not meet the query, then + # it is excluded from the list. + + # Check if result has been completed to avoid errors (if not + # completed, there is no end time) + if result.status != "COMPLETED": + if (not ( + (started_before is not None and result.start_time > started_before) or \ + (started_after is not None and result.start_time < started_after) + )) and (not completed_check): + result_lst.append({"dispatch_id": _id, "result": result}) + else: + if not ((started_before is not None and result.start_time > started_before) or \ + (started_after is not None and result.start_time < started_after) or \ + (completed_before is not None and result.end_time > completed_before) or \ + (completed_after is not None and result.end_time < completed_after)): + + result_lst.append({"dispatch_id": _id, "result": result}) + + + # If the record does not exist, a MissingLatticeRecordError is raised, + # which is caught and ignored + except MissingLatticeRecordError: + continue + + return result_lst