From 0734007486ff3b3ee2f6c5083da00d315405ab5e Mon Sep 17 00:00:00 2001 From: Adi Rao Date: Sun, 7 Jan 2024 14:34:30 +0530 Subject: [PATCH 1/5] Results manager get_all_results implementation --- covalent/_results_manager/results_manager.py | 109 +++++++++++++++++++ 1 file changed, 109 insertions(+) diff --git a/covalent/_results_manager/results_manager.py b/covalent/_results_manager/results_manager.py index 4c751206a..68c799ea7 100644 --- a/covalent/_results_manager/results_manager.py +++ b/covalent/_results_manager/results_manager.py @@ -25,6 +25,7 @@ from furl import furl from requests.adapters import HTTPAdapter from urllib3.util import Retry +from datetime import datetime from .._api.apiclient import CovalentAPIClient from .._serialize.common import load_asset @@ -516,3 +517,111 @@ def get_result( app_log.error(re) num_attempts += 1 raise RuntimeError("Timed out waiting for result. Please retry or check dispatch.") + +def get_all_results(dispatch_id_lst: list[str] = [], + wait: bool = False, + dispatcher_addr: str = None, + status_only: bool = False, + *, + results_dir: Optional[str] = None, + workflow_output: bool = True, + intermediate_outputs: bool = True, + sublattice_results: bool = True, + qelectron_db: bool = False, + completed_check : Optional[bool] = False, + started_before : Optional[datetime] = None, + started_after : Optional[datetime] = None, + completed_before : Optional[datetime] = None, + completed_after: Optional[datetime] = None +) -> list(Result): + """ + Get all the results from a list of dispatch ids. + + Args: + dispatch_id_lst: A list of dispatch ids. + wait: Controls how long the method waits for the server to return a + result. If False, the method will not wait and will return the + current status of the workflow. If True, the method will wait for + the result to finish and keep retrying for sys.maxsize. + dispatcher_addr: Dispatcher server address. Defaults to the address set + in Covalent's config. + status_only: If true, only returns result status, not the full result + object. Default is False. + + Kwargs: + results_dir: The directory where the results are stored in dispatch + id named folders. + workflow_output: Whether to return the workflow output. + Defaults to True. + intermediate_outputs: Whether to return all intermediate outputs in the + compute graph. Defaults to True. + sublattice_results: Whether to recursively retrieve sublattice results. + Default is True. + qelectron_db: Whether to load the bytes data of qelectron_db. Default + is False. + completed_check : Whether to only include completed results. + started_before: Only return results that started before this time. + started_after: Only return results that started after this time. + completed_before: Only return results that completed before this time. + If the result as not been completed, it will not appear in the list. + completed_after: Only return results that completed after this time. + If the result as not been completed, it will not appear in the list. + + Returns: + A list of Result objects from the Covalent server that meet the query. + """ + + # This implementation depends on the results being stored in the same + # machine. May need a better implemenation to fix. As a work around, the + # user can specify the dispatch_id_lst, otherwise it will default to the + # machine specific implementation. + if len(dispatch_id_lst) == 0: + try: + dispatch_dir = get_config()[dispatcher_addr][results_dir] + dispatch_id_lst = os.listdir(dispatch_dir) + except: + raise ValueError("No dispatch ids specified and no default directory found.") + + # Initialize the list of results + result_lst = [] + + for _id in dispatch_id_lst: + try: + # Depends on get_result method, works as normal + result = get_result(_id, wait, dispatcher_addr, status_only, + results_dir, workflow_output, + intermediate_outputs, sublattice_results, + qelectron_db) + + # Does not follow PEP8 for better readability of conditional logic. + # The logic could be simplified. This is it currently as follows: + # If the result does not meet the query, then exclude it from the + # list. started_before and started_after are datetime objects that + # determine the time range for the start time. completed_before and + # completed_after are datetime objects that determine the time range + # for the end time. If the result does not meet the query, then + # it is excluded from the list. + + # Check if result has been completed to avoid errors (if not + # completed, there is no end time) + if result.status != "COMPLETED": + if (not ( + (started_before is not None and result.start_time > started_before) or \ + (started_after is not None and result.start_time < started_after) + )) and (not completed_check): + result_lst.append({"dispatch_id": _id, "result": result}) + else: + if not ((started_before is not None and result.start_time > started_before) or \ + (started_after is not None and result.start_time < started_after) or \ + (completed_before is not None and result.end_time > completed_before) or \ + (completed_after is not None and result.end_time < completed_after)): + + result_lst.append({"dispatch_id": _id, "result": result}) + + + # If the record does not exist, a MissingLatticeRecordError is raised, + # which is caught and ignored + except MissingLatticeRecordError: + continue + + return result_lst From 4682ba7467cdad417da16f1d3827e6ced2d60ac1 Mon Sep 17 00:00:00 2001 From: Adi Rao Date: Sun, 7 Jan 2024 14:35:14 +0530 Subject: [PATCH 2/5] Revert "Results manager get_all_results implementation" This reverts commit 0734007486ff3b3ee2f6c5083da00d315405ab5e. --- covalent/_results_manager/results_manager.py | 109 ------------------- 1 file changed, 109 deletions(-) diff --git a/covalent/_results_manager/results_manager.py b/covalent/_results_manager/results_manager.py index 68c799ea7..4c751206a 100644 --- a/covalent/_results_manager/results_manager.py +++ b/covalent/_results_manager/results_manager.py @@ -25,7 +25,6 @@ from furl import furl from requests.adapters import HTTPAdapter from urllib3.util import Retry -from datetime import datetime from .._api.apiclient import CovalentAPIClient from .._serialize.common import load_asset @@ -517,111 +516,3 @@ def get_result( app_log.error(re) num_attempts += 1 raise RuntimeError("Timed out waiting for result. Please retry or check dispatch.") - -def get_all_results(dispatch_id_lst: list[str] = [], - wait: bool = False, - dispatcher_addr: str = None, - status_only: bool = False, - *, - results_dir: Optional[str] = None, - workflow_output: bool = True, - intermediate_outputs: bool = True, - sublattice_results: bool = True, - qelectron_db: bool = False, - completed_check : Optional[bool] = False, - started_before : Optional[datetime] = None, - started_after : Optional[datetime] = None, - completed_before : Optional[datetime] = None, - completed_after: Optional[datetime] = None -) -> list(Result): - """ - Get all the results from a list of dispatch ids. - - Args: - dispatch_id_lst: A list of dispatch ids. - wait: Controls how long the method waits for the server to return a - result. If False, the method will not wait and will return the - current status of the workflow. If True, the method will wait for - the result to finish and keep retrying for sys.maxsize. - dispatcher_addr: Dispatcher server address. Defaults to the address set - in Covalent's config. - status_only: If true, only returns result status, not the full result - object. Default is False. - - Kwargs: - results_dir: The directory where the results are stored in dispatch - id named folders. - workflow_output: Whether to return the workflow output. - Defaults to True. - intermediate_outputs: Whether to return all intermediate outputs in the - compute graph. Defaults to True. - sublattice_results: Whether to recursively retrieve sublattice results. - Default is True. - qelectron_db: Whether to load the bytes data of qelectron_db. Default - is False. - completed_check : Whether to only include completed results. - started_before: Only return results that started before this time. - started_after: Only return results that started after this time. - completed_before: Only return results that completed before this time. - If the result as not been completed, it will not appear in the list. - completed_after: Only return results that completed after this time. - If the result as not been completed, it will not appear in the list. - - Returns: - A list of Result objects from the Covalent server that meet the query. - """ - - # This implementation depends on the results being stored in the same - # machine. May need a better implemenation to fix. As a work around, the - # user can specify the dispatch_id_lst, otherwise it will default to the - # machine specific implementation. - if len(dispatch_id_lst) == 0: - try: - dispatch_dir = get_config()[dispatcher_addr][results_dir] - dispatch_id_lst = os.listdir(dispatch_dir) - except: - raise ValueError("No dispatch ids specified and no default directory found.") - - # Initialize the list of results - result_lst = [] - - for _id in dispatch_id_lst: - try: - # Depends on get_result method, works as normal - result = get_result(_id, wait, dispatcher_addr, status_only, - results_dir, workflow_output, - intermediate_outputs, sublattice_results, - qelectron_db) - - # Does not follow PEP8 for better readability of conditional logic. - # The logic could be simplified. This is it currently as follows: - # If the result does not meet the query, then exclude it from the - # list. started_before and started_after are datetime objects that - # determine the time range for the start time. completed_before and - # completed_after are datetime objects that determine the time range - # for the end time. If the result does not meet the query, then - # it is excluded from the list. - - # Check if result has been completed to avoid errors (if not - # completed, there is no end time) - if result.status != "COMPLETED": - if (not ( - (started_before is not None and result.start_time > started_before) or \ - (started_after is not None and result.start_time < started_after) - )) and (not completed_check): - result_lst.append({"dispatch_id": _id, "result": result}) - else: - if not ((started_before is not None and result.start_time > started_before) or \ - (started_after is not None and result.start_time < started_after) or \ - (completed_before is not None and result.end_time > completed_before) or \ - (completed_after is not None and result.end_time < completed_after)): - - result_lst.append({"dispatch_id": _id, "result": result}) - - - # If the record does not exist, a MissingLatticeRecordError is raised, - # which is caught and ignored - except MissingLatticeRecordError: - continue - - return result_lst From 6154aaa86292ac35ee0fdc8ce0161a74f6666cde Mon Sep 17 00:00:00 2001 From: Adi Rao Date: Sun, 7 Jan 2024 14:36:28 +0530 Subject: [PATCH 3/5] Get all Results implementation --- covalent/_results_manager/results_manager.py | 109 +++++++++++++++++++ 1 file changed, 109 insertions(+) diff --git a/covalent/_results_manager/results_manager.py b/covalent/_results_manager/results_manager.py index 4c751206a..68c799ea7 100644 --- a/covalent/_results_manager/results_manager.py +++ b/covalent/_results_manager/results_manager.py @@ -25,6 +25,7 @@ from furl import furl from requests.adapters import HTTPAdapter from urllib3.util import Retry +from datetime import datetime from .._api.apiclient import CovalentAPIClient from .._serialize.common import load_asset @@ -516,3 +517,111 @@ def get_result( app_log.error(re) num_attempts += 1 raise RuntimeError("Timed out waiting for result. Please retry or check dispatch.") + +def get_all_results(dispatch_id_lst: list[str] = [], + wait: bool = False, + dispatcher_addr: str = None, + status_only: bool = False, + *, + results_dir: Optional[str] = None, + workflow_output: bool = True, + intermediate_outputs: bool = True, + sublattice_results: bool = True, + qelectron_db: bool = False, + completed_check : Optional[bool] = False, + started_before : Optional[datetime] = None, + started_after : Optional[datetime] = None, + completed_before : Optional[datetime] = None, + completed_after: Optional[datetime] = None +) -> list(Result): + """ + Get all the results from a list of dispatch ids. + + Args: + dispatch_id_lst: A list of dispatch ids. + wait: Controls how long the method waits for the server to return a + result. If False, the method will not wait and will return the + current status of the workflow. If True, the method will wait for + the result to finish and keep retrying for sys.maxsize. + dispatcher_addr: Dispatcher server address. Defaults to the address set + in Covalent's config. + status_only: If true, only returns result status, not the full result + object. Default is False. + + Kwargs: + results_dir: The directory where the results are stored in dispatch + id named folders. + workflow_output: Whether to return the workflow output. + Defaults to True. + intermediate_outputs: Whether to return all intermediate outputs in the + compute graph. Defaults to True. + sublattice_results: Whether to recursively retrieve sublattice results. + Default is True. + qelectron_db: Whether to load the bytes data of qelectron_db. Default + is False. + completed_check : Whether to only include completed results. + started_before: Only return results that started before this time. + started_after: Only return results that started after this time. + completed_before: Only return results that completed before this time. + If the result as not been completed, it will not appear in the list. + completed_after: Only return results that completed after this time. + If the result as not been completed, it will not appear in the list. + + Returns: + A list of Result objects from the Covalent server that meet the query. + """ + + # This implementation depends on the results being stored in the same + # machine. May need a better implemenation to fix. As a work around, the + # user can specify the dispatch_id_lst, otherwise it will default to the + # machine specific implementation. + if len(dispatch_id_lst) == 0: + try: + dispatch_dir = get_config()[dispatcher_addr][results_dir] + dispatch_id_lst = os.listdir(dispatch_dir) + except: + raise ValueError("No dispatch ids specified and no default directory found.") + + # Initialize the list of results + result_lst = [] + + for _id in dispatch_id_lst: + try: + # Depends on get_result method, works as normal + result = get_result(_id, wait, dispatcher_addr, status_only, + results_dir, workflow_output, + intermediate_outputs, sublattice_results, + qelectron_db) + + # Does not follow PEP8 for better readability of conditional logic. + # The logic could be simplified. This is it currently as follows: + # If the result does not meet the query, then exclude it from the + # list. started_before and started_after are datetime objects that + # determine the time range for the start time. completed_before and + # completed_after are datetime objects that determine the time range + # for the end time. If the result does not meet the query, then + # it is excluded from the list. + + # Check if result has been completed to avoid errors (if not + # completed, there is no end time) + if result.status != "COMPLETED": + if (not ( + (started_before is not None and result.start_time > started_before) or \ + (started_after is not None and result.start_time < started_after) + )) and (not completed_check): + result_lst.append({"dispatch_id": _id, "result": result}) + else: + if not ((started_before is not None and result.start_time > started_before) or \ + (started_after is not None and result.start_time < started_after) or \ + (completed_before is not None and result.end_time > completed_before) or \ + (completed_after is not None and result.end_time < completed_after)): + + result_lst.append({"dispatch_id": _id, "result": result}) + + + # If the record does not exist, a MissingLatticeRecordError is raised, + # which is caught and ignored + except MissingLatticeRecordError: + continue + + return result_lst From a57478111a342c728903e9dfa57dccaf833db03e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 7 Jan 2024 09:17:00 +0000 Subject: [PATCH 4/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- covalent/_results_manager/results_manager.py | 42 ++++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/covalent/_results_manager/results_manager.py b/covalent/_results_manager/results_manager.py index 68c799ea7..95f427eb3 100644 --- a/covalent/_results_manager/results_manager.py +++ b/covalent/_results_manager/results_manager.py @@ -530,8 +530,8 @@ def get_all_results(dispatch_id_lst: list[str] = [], qelectron_db: bool = False, completed_check : Optional[bool] = False, started_before : Optional[datetime] = None, - started_after : Optional[datetime] = None, - completed_before : Optional[datetime] = None, + started_after : Optional[datetime] = None, + completed_before : Optional[datetime] = None, completed_after: Optional[datetime] = None ) -> list(Result): """ @@ -539,25 +539,25 @@ def get_all_results(dispatch_id_lst: list[str] = [], Args: dispatch_id_lst: A list of dispatch ids. - wait: Controls how long the method waits for the server to return a - result. If False, the method will not wait and will return the - current status of the workflow. If True, the method will wait for + wait: Controls how long the method waits for the server to return a + result. If False, the method will not wait and will return the + current status of the workflow. If True, the method will wait for the result to finish and keep retrying for sys.maxsize. - dispatcher_addr: Dispatcher server address. Defaults to the address set + dispatcher_addr: Dispatcher server address. Defaults to the address set in Covalent's config. - status_only: If true, only returns result status, not the full result + status_only: If true, only returns result status, not the full result object. Default is False. Kwargs: - results_dir: The directory where the results are stored in dispatch + results_dir: The directory where the results are stored in dispatch id named folders. - workflow_output: Whether to return the workflow output. + workflow_output: Whether to return the workflow output. Defaults to True. - intermediate_outputs: Whether to return all intermediate outputs in the + intermediate_outputs: Whether to return all intermediate outputs in the compute graph. Defaults to True. - sublattice_results: Whether to recursively retrieve sublattice results. + sublattice_results: Whether to recursively retrieve sublattice results. Default is True. - qelectron_db: Whether to load the bytes data of qelectron_db. Default + qelectron_db: Whether to load the bytes data of qelectron_db. Default is False. completed_check : Whether to only include completed results. started_before: Only return results that started before this time. @@ -588,12 +588,12 @@ def get_all_results(dispatch_id_lst: list[str] = [], for _id in dispatch_id_lst: try: # Depends on get_result method, works as normal - result = get_result(_id, wait, dispatcher_addr, status_only, - results_dir, workflow_output, - intermediate_outputs, sublattice_results, + result = get_result(_id, wait, dispatcher_addr, status_only, + results_dir, workflow_output, + intermediate_outputs, sublattice_results, qelectron_db) - - # Does not follow PEP8 for better readability of conditional logic. + + # Does not follow PEP8 for better readability of conditional logic. # The logic could be simplified. This is it currently as follows: # If the result does not meet the query, then exclude it from the # list. started_before and started_after are datetime objects that @@ -602,7 +602,7 @@ def get_all_results(dispatch_id_lst: list[str] = [], # for the end time. If the result does not meet the query, then # it is excluded from the list. - # Check if result has been completed to avoid errors (if not + # Check if result has been completed to avoid errors (if not # completed, there is no end time) if result.status != "COMPLETED": if (not ( @@ -610,7 +610,7 @@ def get_all_results(dispatch_id_lst: list[str] = [], (started_after is not None and result.start_time < started_after) )) and (not completed_check): result_lst.append({"dispatch_id": _id, "result": result}) - else: + else: if not ((started_before is not None and result.start_time > started_before) or \ (started_after is not None and result.start_time < started_after) or \ (completed_before is not None and result.end_time > completed_before) or \ @@ -618,10 +618,10 @@ def get_all_results(dispatch_id_lst: list[str] = [], result_lst.append({"dispatch_id": _id, "result": result}) - + # If the record does not exist, a MissingLatticeRecordError is raised, # which is caught and ignored except MissingLatticeRecordError: continue - + return result_lst From 49915cb8930ebdb64837a4ccc69246b6874def6c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 7 Jan 2024 16:57:04 +0000 Subject: [PATCH 5/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- covalent/_results_manager/results_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/covalent/_results_manager/results_manager.py b/covalent/_results_manager/results_manager.py index 95f427eb3..5ed5eee61 100644 --- a/covalent/_results_manager/results_manager.py +++ b/covalent/_results_manager/results_manager.py @@ -19,13 +19,13 @@ import contextlib import os +from datetime import datetime from pathlib import Path from typing import Dict, List, Optional from furl import furl from requests.adapters import HTTPAdapter from urllib3.util import Retry -from datetime import datetime from .._api.apiclient import CovalentAPIClient from .._serialize.common import load_asset