From b7d7e201bfea418089541b620b0e494584b347b0 Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Fri, 23 Feb 2024 23:55:31 -0500 Subject: [PATCH 01/14] reworked Darshan enabling procedure (added caching and decorator) --- src/radical/entk/utils/provenance.py | 105 ++++++++++++++++++++------- 1 file changed, 78 insertions(+), 27 deletions(-) diff --git a/src/radical/entk/utils/provenance.py b/src/radical/entk/utils/provenance.py index 4d05a12f..0a9efe67 100644 --- a/src/radical/entk/utils/provenance.py +++ b/src/radical/entk/utils/provenance.py @@ -8,48 +8,99 @@ import radical.utils as ru -from .. import Pipeline, Task +from .. import Pipeline, Stage, Task +_darshan_activation_cmds = None _darshan_env = None +_darshan_runtime_root = None # ------------------------------------------------------------------------------ # -def enable_darshan(pipelines: List[Pipeline], - darshan_runtime_root: Optional[str] = None, - modules: Optional[List[str]] = None) -> None: - - if darshan_runtime_root: - if not darshan_runtime_root.startswith('/'): - raise RuntimeError('Path for the darshan installation ' - 'should be an absolute path ' +def cache_darshan_env(darshan_runtime_root: Optional[str] = None, + modules: Optional[List[str]] = None, + env: Optional[Dict[str, str]] = None) -> None: + global _darshan_runtime_root + + if _darshan_runtime_root is None: + if (darshan_runtime_root + and not darshan_runtime_root.startswith('$') + and not darshan_runtime_root.startswith('/')): + raise RuntimeError('Darshan root directory should be set with ' + 'either env variable or an absolute path ' f'(provided path: {darshan_runtime_root})') - else: - darshan_runtime_root = '$DARSHAN_RUNTIME_ROOT' + _darshan_runtime_root = darshan_runtime_root or '$DARSHAN_RUNTIME_ROOT' + global _darshan_activation_cmds global _darshan_env - darshan_activation_cmds = [] - for module in modules or []: - darshan_activation_cmds.append(f'module load {module}') - _darshan_env = ru.env_prep(pre_exec_cached=darshan_activation_cmds) + if _darshan_activation_cmds is None: - for pipeline in pipelines: - for stage in pipeline.stages: + _darshan_activation_cmds = [] + for module in modules or []: + _darshan_activation_cmds.append(f'module load {module}') + for k, v in (env or {}).items(): + _darshan_activation_cmds.append(f'export {k.upper()}={v}') + + _darshan_env = ru.env_prep(pre_exec_cached=_darshan_activation_cmds) + + +# ------------------------------------------------------------------------------ +# decorator to enable darshan for function that generates Pipeline, Stage, Task +def darshan(func, + darshan_runtime_root: Optional[str] = None, + modules: Optional[List[str]] = None, + env: Optional[Dict[str, str]] = None): + def wrapper(*args, **kwargs): + return enable_darshan(func(*args, **kwargs), + darshan_runtime_root=darshan_runtime_root, + modules=modules, + env=env) + + return wrapper + + +# ------------------------------------------------------------------------------ +# +def enable_darshan(pst_obj: Union[Pipeline, Stage, Task], + darshan_runtime_root: Optional[str] = None, + modules: Optional[List[str]] = None, + env: Optional[Dict[str, str]] = None + ) -> Union[Pipeline, Stage, Task]: + if not isinstance(pst_obj, (Pipeline, Stage, Task)): + raise TypeError('Provide PST object to enable Darshan') + + cache_darshan_env(darshan_runtime_root, modules, env) + + def _enable_darshan(src_task: Task): + darshan_log_dir = '${RP_TASK_SANDBOX}/${RP_TASK_ID}_darshan' + darshan_enable = (f'LD_PRELOAD="{_darshan_runtime_root}' + '/lib/libdarshan.so" ') + + if src_task.cpu_reqs.cpu_processes == 1: + darshan_enable += 'DARSHAN_ENABLE_NONMPI=1 ' + + src_task.executable = darshan_enable + src_task.executable + src_task.pre_launch += [f'mkdir -p {darshan_log_dir}'] + src_task.pre_exec.extend( + _darshan_activation_cmds + + [f'export DARSHAN_LOG_DIR_PATH={darshan_log_dir}']) + + if isinstance(pst_obj, Pipeline): + for stage in pst_obj.stages: for task in stage.tasks: + _enable_darshan(task) + return pst_obj - darshan_log_dir = '${RP_TASK_SANDBOX}/${RP_TASK_ID}_darshan' - darshan_enable = (f'LD_PRELOAD="{darshan_runtime_root}' - '/lib/libdarshan.so" ') + elif isinstance(pst_obj, Stage): + for task in pst_obj.tasks: + _enable_darshan(task) + return pst_obj - if task.cpu_reqs.cpu_processes == 1: - darshan_enable += 'DARSHAN_ENABLE_NONMPI=1 ' + elif isinstance(pst_obj, Task): + _enable_darshan(pst_obj) - task.executable = darshan_enable + task.executable - task.pre_launch += [f'mkdir -p {darshan_log_dir}'] - task.pre_exec.extend( - darshan_activation_cmds + - [f'export DARSHAN_LOG_DIR_PATH={darshan_log_dir}']) + return pst_obj # ------------------------------------------------------------------------------ From ea3c6d335a8f9b2fa443a0bc1aeb94843a2d38a3 Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Sat, 24 Feb 2024 00:09:00 -0500 Subject: [PATCH 02/14] added protection against double-enabling and empty executable --- src/radical/entk/utils/provenance.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/radical/entk/utils/provenance.py b/src/radical/entk/utils/provenance.py index 0a9efe67..da05a363 100644 --- a/src/radical/entk/utils/provenance.py +++ b/src/radical/entk/utils/provenance.py @@ -11,8 +11,8 @@ from .. import Pipeline, Stage, Task _darshan_activation_cmds = None -_darshan_env = None -_darshan_runtime_root = None +_darshan_env = None +_darshan_runtime_root = None # ------------------------------------------------------------------------------ @@ -20,6 +20,7 @@ def cache_darshan_env(darshan_runtime_root: Optional[str] = None, modules: Optional[List[str]] = None, env: Optional[Dict[str, str]] = None) -> None: + global _darshan_runtime_root if _darshan_runtime_root is None: @@ -56,7 +57,6 @@ def wrapper(*args, **kwargs): darshan_runtime_root=darshan_runtime_root, modules=modules, env=env) - return wrapper @@ -67,20 +67,28 @@ def enable_darshan(pst_obj: Union[Pipeline, Stage, Task], modules: Optional[List[str]] = None, env: Optional[Dict[str, str]] = None ) -> Union[Pipeline, Stage, Task]: + if not isinstance(pst_obj, (Pipeline, Stage, Task)): raise TypeError('Provide PST object to enable Darshan') cache_darshan_env(darshan_runtime_root, modules, env) def _enable_darshan(src_task: Task): + + if not src_task.executable: + return + elif 'libdarshan.so' in src_task.executable: + # Darshan is already enabled + return + darshan_log_dir = '${RP_TASK_SANDBOX}/${RP_TASK_ID}_darshan' - darshan_enable = (f'LD_PRELOAD="{_darshan_runtime_root}' - '/lib/libdarshan.so" ') + darshan_enable = (f'LD_PRELOAD="{_darshan_runtime_root}' + '/lib/libdarshan.so" ') if src_task.cpu_reqs.cpu_processes == 1: darshan_enable += 'DARSHAN_ENABLE_NONMPI=1 ' - src_task.executable = darshan_enable + src_task.executable + src_task.executable = darshan_enable + src_task.executable src_task.pre_launch += [f'mkdir -p {darshan_log_dir}'] src_task.pre_exec.extend( _darshan_activation_cmds + From a3476af6ed5c87dc079847b3c57caf8f04906c19 Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Sat, 24 Feb 2024 00:18:06 -0500 Subject: [PATCH 03/14] updated Darshan example according to the latest changes --- examples/misc/darshan_enabled.py | 43 ++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/examples/misc/darshan_enabled.py b/examples/misc/darshan_enabled.py index a5f516fa..daec8f4d 100644 --- a/examples/misc/darshan_enabled.py +++ b/examples/misc/darshan_enabled.py @@ -5,7 +5,10 @@ import radical.entk as re import radical.pilot as rp -from radical.entk.utils.provenance import enable_darshan, get_provenance_graph +from radical.entk.utils.provenance import (cache_darshan_env, + darshan, + enable_darshan, + get_provenance_graph) RESOURCE_DESCRIPTION = { # https://radicalpilot.readthedocs.io/en/stable/supported/polaris.html @@ -19,9 +22,11 @@ os.environ['RADICAL_LOG_LVL'] = 'DEBUG' os.environ['RADICAL_REPORT'] = 'TRUE' +TASK_01_OUTPUT = 'output_01.dat' + # pylint: disable=anomalous-backslash-in-string -def get_stages(): +def get_stage_0(): # hello-RP task task_00 = re.Task({ @@ -33,42 +38,48 @@ def get_stages(): }) # R/W data - output_01 = 'output_01.dat' task_01 = re.Task({ 'executable' : '/bin/sh', - 'arguments' : ['-c', f'cat input.dat | wc > {output_01}'], + 'arguments' : ['-c', f'cat input.dat | wc > {TASK_01_OUTPUT}'], 'upload_input_data': ['/etc/passwd > input.dat'], - 'copy_output_data' : [f'{output_01} > $SHARED/{output_01}'] + 'copy_output_data' : [f'{TASK_01_OUTPUT} > $SHARED/{TASK_01_OUTPUT}'] }) stage_0 = re.Stage() - stage_0.add_tasks([task_00, task_01]) + # --- enable Darshan for task "task_01" only + stage_0.add_tasks([task_00, enable_darshan(task_01)]) + return stage_0 + + +# --- enable Darshan for the whole "stage_1" using decorator +@darshan +def get_stage_1(): # R/W data and task depends on the task from the previous stage task_10 = re.Task({ 'executable' : '/bin/sh', 'arguments' : ['-c', - f"sed -r 's/\s+//g' {output_01} " + # noqa: W605 + f"sed -r 's/\s+//g' {TASK_01_OUTPUT} " + # noqa: W605 '| grep -o . | sort | uniq -c > output_10.dat'], - 'copy_input_data': [f'$SHARED/{output_01} > {output_01}'] + 'copy_input_data': [f'$SHARED/{TASK_01_OUTPUT} > {TASK_01_OUTPUT}'] }) stage_1 = re.Stage() stage_1.add_tasks([task_10]) - - return [stage_0, stage_1] + return stage_1 def main(): + + cache_darshan_env(darshan_runtime_root='$DARSHAN_RUNTIME_ROOT', + modules=['e4s/22.08/PrgEnv-gnu', + 'darshan-runtime', + 'darshan-util']) + pipeline = re.Pipeline() - pipeline.add_stages(get_stages()) + pipeline.add_stages([get_stage_0(), get_stage_1()]) workflow = [pipeline] - enable_darshan(pipelines=workflow, - modules=['e4s/22.08/PrgEnv-gnu', - 'darshan-runtime', - 'darshan-util']) - amgr = re.AppManager() amgr.resource_desc = RESOURCE_DESCRIPTION amgr.workflow = workflow From 6a11ee5f65beddc3b7e7848c31ddc5dc9c19e896 Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Sat, 24 Feb 2024 10:40:31 -0500 Subject: [PATCH 04/14] reorganize darshan and provenance modules (move into `/tools`) --- examples/misc/darshan_enabled.py | 8 +- src/radical/entk/tools/__init__.py | 6 ++ .../{utils/provenance.py => tools/darshan.py} | 73 ++---------------- src/radical/entk/tools/provenance.py | 77 +++++++++++++++++++ 4 files changed, 95 insertions(+), 69 deletions(-) create mode 100644 src/radical/entk/tools/__init__.py rename src/radical/entk/{utils/provenance.py => tools/darshan.py} (71%) create mode 100644 src/radical/entk/tools/provenance.py diff --git a/examples/misc/darshan_enabled.py b/examples/misc/darshan_enabled.py index daec8f4d..c368604c 100644 --- a/examples/misc/darshan_enabled.py +++ b/examples/misc/darshan_enabled.py @@ -5,10 +5,10 @@ import radical.entk as re import radical.pilot as rp -from radical.entk.utils.provenance import (cache_darshan_env, - darshan, - enable_darshan, - get_provenance_graph) +from radical.entk.tools import (cache_darshan_env, + darshan, + enable_darshan, + get_provenance_graph) RESOURCE_DESCRIPTION = { # https://radicalpilot.readthedocs.io/en/stable/supported/polaris.html diff --git a/src/radical/entk/tools/__init__.py b/src/radical/entk/tools/__init__.py new file mode 100644 index 00000000..1fcf4655 --- /dev/null +++ b/src/radical/entk/tools/__init__.py @@ -0,0 +1,6 @@ + +from .darshan import cache_darshan_env, darshan, enable_darshan +from .darshan import annotate_task_with_darshan + +from .provenance import extract_provenance_graph, get_provenance_graph + diff --git a/src/radical/entk/utils/provenance.py b/src/radical/entk/tools/darshan.py similarity index 71% rename from src/radical/entk/utils/provenance.py rename to src/radical/entk/tools/darshan.py index da05a363..a0020cf2 100644 --- a/src/radical/entk/utils/provenance.py +++ b/src/radical/entk/tools/darshan.py @@ -10,6 +10,8 @@ from .. import Pipeline, Stage, Task +DARSHAN_LOG_DIR = '%(sandbox)s/darshan_logs' + _darshan_activation_cmds = None _darshan_env = None _darshan_runtime_root = None @@ -81,7 +83,7 @@ def _enable_darshan(src_task: Task): # Darshan is already enabled return - darshan_log_dir = '${RP_TASK_SANDBOX}/${RP_TASK_ID}_darshan' + darshan_log_dir = DARSHAN_LOG_DIR % {'sandbox': '${RP_TASK_SANDBOX}'} darshan_enable = (f'LD_PRELOAD="{_darshan_runtime_root}' '/lib/libdarshan.so" ') @@ -143,79 +145,20 @@ def annotate_task_with_darshan(task: Task) -> None: inputs = set() outputs = set() - for log in glob.glob(f'{task.path}/{task.uid}_darshan/*'): + for log in glob.glob((DARSHAN_LOG_DIR % {'sandbox': task.path}) + '/*'): inputs.update(get_parsed_data(log, ['POSIX_BYTES_READ', 'STDIO_OPENS'])) outputs.update(get_parsed_data(log, 'POSIX_BYTES_WRITTEN')) arguments = ' '.join(task.arguments) if '>' in arguments: - outputs.add(arguments.split('>')[1].split(';')[0].strip()) + output = arguments.split('>')[1].split(';')[0].strip() + if not output.startswith('/') and not output.startswith('$'): + output = task.path + '/' + output + outputs.add(output) task.annotate(inputs=sorted(inputs), outputs=sorted(outputs)) -# ------------------------------------------------------------------------------ -# -def get_provenance_graph(pipelines: Union[Pipeline, List[Pipeline]], - output_file: Optional[str] = None) -> Dict: - """ - Using UIDs of all entities to build a workflow provenance graph. - """ - - graph = {} - - for pipeline in ru.as_list(pipelines): - graph[pipeline.uid] = {} - for stage in pipeline.stages: - graph[pipeline.uid][stage.uid] = {} - for task in stage.tasks: - annotate_task_with_darshan(task) - graph[pipeline.uid][stage.uid][task.uid] = \ - task.annotations.as_dict() - - if output_file: - if not output_file.endswith('.json'): - output_file += '.json' - ru.write_json(graph, output_file) - - return graph - - -# ------------------------------------------------------------------------------ -# -def extract_provenance_graph(session_json: str, - output_file: Optional[str] = None) -> Dict: - """ - Using session JSON file to build a workflow provenance graph. - """ - - session_entities = ru.read_json(session_json) - - if not session_entities.get('task'): - raise ValueError('No task entities in provided session') - - graph = {} - - for task in session_entities['task']: - task_uid, _, stage_uid, _, pipeline_uid, _ = task['name'].split(',') - graph.\ - setdefault(pipeline_uid, {}).\ - setdefault(stage_uid, {}).\ - setdefault(task_uid, - task['description']['metadata'].get('data') or {}) - - for pipeline_uid in graph: - for stage_uid in graph[pipeline_uid]: - graph[pipeline_uid][stage_uid].sort() - - if output_file: - if not output_file.endswith('.json'): - output_file += '.json' - ru.write_json(graph, output_file) - - return graph - - # ------------------------------------------------------------------------------ diff --git a/src/radical/entk/tools/provenance.py b/src/radical/entk/tools/provenance.py new file mode 100644 index 00000000..efc99b3b --- /dev/null +++ b/src/radical/entk/tools/provenance.py @@ -0,0 +1,77 @@ + +__copyright__ = 'Copyright 2024, The RADICAL-Cybertools Team' +__license__ = 'MIT' + +from typing import Optional, Dict, List, Union + +import radical.utils as ru + +from .. import Pipeline + +from .darshan import annotate_task_with_darshan + + +# ------------------------------------------------------------------------------ +# +def get_provenance_graph(pipelines: Union[Pipeline, List[Pipeline]], + output_file: Optional[str] = None) -> Dict: + """ + Using UIDs of all entities to build a workflow provenance graph. + """ + + graph = {} + + for pipeline in ru.as_list(pipelines): + graph[pipeline.uid] = {} + for stage in pipeline.stages: + graph[pipeline.uid][stage.uid] = {} + for task in stage.tasks: + annotate_task_with_darshan(task) + graph[pipeline.uid][stage.uid][task.uid] = \ + task.annotations.as_dict() + + if output_file: + if not output_file.endswith('.json'): + output_file += '.json' + ru.write_json(graph, output_file) + + return graph + + +# ------------------------------------------------------------------------------ +# +def extract_provenance_graph(session_json: str, + output_file: Optional[str] = None) -> Dict: + """ + Using session JSON file to build a workflow provenance graph. + """ + + session_entities = ru.read_json(session_json) + + if not session_entities.get('task'): + raise ValueError('No task entities in provided session') + + graph = {} + + for task in session_entities['task']: + task_uid, _, stage_uid, _, pipeline_uid, _ = task['name'].split(',') + graph.\ + setdefault(pipeline_uid, {}).\ + setdefault(stage_uid, {}).\ + setdefault(task_uid, + task['description']['metadata'].get('data') or {}) + + for pipeline_uid in graph: + for stage_uid in graph[pipeline_uid]: + graph[pipeline_uid][stage_uid].sort() + + if output_file: + if not output_file.endswith('.json'): + output_file += '.json' + ru.write_json(graph, output_file) + + return graph + + +# ------------------------------------------------------------------------------ + From 837e3c61eb8bd707030642121bdbeeeae811c1c5 Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Sat, 24 Feb 2024 11:02:08 -0500 Subject: [PATCH 05/14] returned back support for list of Pipelines to enable Darshan --- src/radical/entk/tools/darshan.py | 32 +++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/src/radical/entk/tools/darshan.py b/src/radical/entk/tools/darshan.py index a0020cf2..63f1cca9 100644 --- a/src/radical/entk/tools/darshan.py +++ b/src/radical/entk/tools/darshan.py @@ -64,14 +64,18 @@ def wrapper(*args, **kwargs): # ------------------------------------------------------------------------------ # -def enable_darshan(pst_obj: Union[Pipeline, Stage, Task], +def enable_darshan(pst: Union[Pipeline, Stage, Task, List[Pipeline]], darshan_runtime_root: Optional[str] = None, modules: Optional[List[str]] = None, env: Optional[Dict[str, str]] = None ) -> Union[Pipeline, Stage, Task]: - if not isinstance(pst_obj, (Pipeline, Stage, Task)): - raise TypeError('Provide PST object to enable Darshan') + if not pst: + raise ValueError('PST object is not provided') + elif isinstance(pst, list) and not isinstance(pst[0], Pipeline): + raise TypeError('List of Pipelines is not provided') + elif not isinstance(pst, (Pipeline, Stage, Task)): + raise TypeError('Non PST object provided') cache_darshan_env(darshan_runtime_root, modules, env) @@ -96,21 +100,25 @@ def _enable_darshan(src_task: Task): _darshan_activation_cmds + [f'export DARSHAN_LOG_DIR_PATH={darshan_log_dir}']) - if isinstance(pst_obj, Pipeline): - for stage in pst_obj.stages: + if isinstance(pst, list): + for pipeline in pst: + for stage in pipeline.stages: + for task in stage.tasks: + _enable_darshan(task) + + elif isinstance(pst, Pipeline): + for stage in pst.stages: for task in stage.tasks: _enable_darshan(task) - return pst_obj - elif isinstance(pst_obj, Stage): - for task in pst_obj.tasks: + elif isinstance(pst, Stage): + for task in pst.tasks: _enable_darshan(task) - return pst_obj - elif isinstance(pst_obj, Task): - _enable_darshan(pst_obj) + elif isinstance(pst, Task): + _enable_darshan(pst) - return pst_obj + return pst # ------------------------------------------------------------------------------ From 10ab12267801ed2be44f6638eb853e4dcd0c0591 Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Tue, 27 Feb 2024 13:06:30 -0500 Subject: [PATCH 06/14] renamed decorator for Darshan --- examples/misc/darshan_enabled.py | 4 ++-- src/radical/entk/tools/__init__.py | 2 +- src/radical/entk/tools/darshan.py | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/examples/misc/darshan_enabled.py b/examples/misc/darshan_enabled.py index c368604c..0f5093b1 100644 --- a/examples/misc/darshan_enabled.py +++ b/examples/misc/darshan_enabled.py @@ -6,7 +6,7 @@ import radical.pilot as rp from radical.entk.tools import (cache_darshan_env, - darshan, + with_darshan, enable_darshan, get_provenance_graph) @@ -52,7 +52,7 @@ def get_stage_0(): # --- enable Darshan for the whole "stage_1" using decorator -@darshan +@with_darshan def get_stage_1(): # R/W data and task depends on the task from the previous stage diff --git a/src/radical/entk/tools/__init__.py b/src/radical/entk/tools/__init__.py index 1fcf4655..a1efecda 100644 --- a/src/radical/entk/tools/__init__.py +++ b/src/radical/entk/tools/__init__.py @@ -1,5 +1,5 @@ -from .darshan import cache_darshan_env, darshan, enable_darshan +from .darshan import cache_darshan_env, with_darshan, enable_darshan from .darshan import annotate_task_with_darshan from .provenance import extract_provenance_graph, get_provenance_graph diff --git a/src/radical/entk/tools/darshan.py b/src/radical/entk/tools/darshan.py index 63f1cca9..dfcefdc8 100644 --- a/src/radical/entk/tools/darshan.py +++ b/src/radical/entk/tools/darshan.py @@ -50,10 +50,10 @@ def cache_darshan_env(darshan_runtime_root: Optional[str] = None, # ------------------------------------------------------------------------------ # decorator to enable darshan for function that generates Pipeline, Stage, Task -def darshan(func, - darshan_runtime_root: Optional[str] = None, - modules: Optional[List[str]] = None, - env: Optional[Dict[str, str]] = None): +def with_darshan(func, + darshan_runtime_root: Optional[str] = None, + modules: Optional[List[str]] = None, + env: Optional[Dict[str, str]] = None): def wrapper(*args, **kwargs): return enable_darshan(func(*args, **kwargs), darshan_runtime_root=darshan_runtime_root, From 75486a358b5c5bcf3c8fec8cab6523af06d85cda Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Tue, 27 Feb 2024 15:13:51 -0500 Subject: [PATCH 07/14] fixed handling of the completed tasks --- src/radical/entk/appman/wfprocessor.py | 8 ++++++++ src/radical/entk/execman/rp/task_manager.py | 2 ++ 2 files changed, 10 insertions(+) diff --git a/src/radical/entk/appman/wfprocessor.py b/src/radical/entk/appman/wfprocessor.py index 63950d40..606dc032 100644 --- a/src/radical/entk/appman/wfprocessor.py +++ b/src/radical/entk/appman/wfprocessor.py @@ -338,6 +338,14 @@ def _update_dequeued_task(self, deq_task): if task.uid != deq_task.uid: continue + # due to the possibility of race condition with + # AppManager._update_task(), we ensure that task + # attributes "path" and "rts_uid" are set. + if not task.path and deq_task.path: + task.path = str(deq_task.path) + if not task.rts_uid and deq_task.rts_uid: + task.rts_uid = str(deq_task.rts_uid) + # If there is no exit code, we assume success # We are only concerned about state of task and not # deq_task diff --git a/src/radical/entk/execman/rp/task_manager.py b/src/radical/entk/execman/rp/task_manager.py index f6f62942..65757440 100644 --- a/src/radical/entk/execman/rp/task_manager.py +++ b/src/radical/entk/execman/rp/task_manager.py @@ -255,12 +255,14 @@ def task_state_cb(rp_task, state): task = create_task_from_rp(rp_task, self._log, self._prof) + # to AppManager self._advance(task, 'Task', states.COMPLETED, 'cb-to-sync') load_placeholder(task) tdict = task.as_dict() + # to WFprocessor self._zmq_queue['put'].put(qname='completed', msgs=[tdict]) self._log.info('Pushed task %s with state %s to completed', task.uid, task.state) From 33facefc668422cb6738b1ce638550ee1ab91c0f Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Tue, 27 Feb 2024 15:23:44 -0500 Subject: [PATCH 08/14] fixed linting --- src/radical/entk/tools/darshan.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/radical/entk/tools/darshan.py b/src/radical/entk/tools/darshan.py index dfcefdc8..9a80da43 100644 --- a/src/radical/entk/tools/darshan.py +++ b/src/radical/entk/tools/darshan.py @@ -26,9 +26,9 @@ def cache_darshan_env(darshan_runtime_root: Optional[str] = None, global _darshan_runtime_root if _darshan_runtime_root is None: - if (darshan_runtime_root - and not darshan_runtime_root.startswith('$') - and not darshan_runtime_root.startswith('/')): + if (darshan_runtime_root and + not darshan_runtime_root.startswith('$') and + not darshan_runtime_root.startswith('/')): raise RuntimeError('Darshan root directory should be set with ' 'either env variable or an absolute path ' f'(provided path: {darshan_runtime_root})') From e0d241aebfebf542ed6696d8ed105484ea2142c6 Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Wed, 28 Feb 2024 15:35:03 -0500 Subject: [PATCH 09/14] fixed `radical-entk-provenance` --- bin/radical-entk-provenance | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/radical-entk-provenance b/bin/radical-entk-provenance index fe975ded..339af95f 100755 --- a/bin/radical-entk-provenance +++ b/bin/radical-entk-provenance @@ -6,7 +6,7 @@ __license__ = 'MIT' import argparse import sys -from radical.entk.utils.provenance import extract_provenance_graph +from radical.entk.tools import extract_provenance_graph # ------------------------------------------------------------------------------ From 599d2233935bbfa7592b466d96c345e6a0362af7 Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Wed, 28 Feb 2024 17:38:32 -0500 Subject: [PATCH 10/14] added couple tests for Darshan module --- src/radical/entk/tools/darshan.py | 2 +- tests/test_utils/test_tools.py | 67 +++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) create mode 100644 tests/test_utils/test_tools.py diff --git a/src/radical/entk/tools/darshan.py b/src/radical/entk/tools/darshan.py index 9a80da43..4a5b4e06 100644 --- a/src/radical/entk/tools/darshan.py +++ b/src/radical/entk/tools/darshan.py @@ -43,7 +43,7 @@ def cache_darshan_env(darshan_runtime_root: Optional[str] = None, for module in modules or []: _darshan_activation_cmds.append(f'module load {module}') for k, v in (env or {}).items(): - _darshan_activation_cmds.append(f'export {k.upper()}={v}') + _darshan_activation_cmds.append(f'export {k.upper()}="{v}"') _darshan_env = ru.env_prep(pre_exec_cached=_darshan_activation_cmds) diff --git a/tests/test_utils/test_tools.py b/tests/test_utils/test_tools.py new file mode 100644 index 00000000..959060d8 --- /dev/null +++ b/tests/test_utils/test_tools.py @@ -0,0 +1,67 @@ + +from unittest import TestCase + +import radical.entk as re +import radical.entk.tools.darshan as re_darshan + + +# ------------------------------------------------------------------------------ +# +class TestTools(TestCase): + + # -------------------------------------------------------------------------- + # + def test_cache_darshan_env(self): + + with self.assertRaises(RuntimeError): + # should be set with either env variable or an absolute path + re_darshan.cache_darshan_env(darshan_runtime_root='dir_name') + + self.assertIsNone(re_darshan._darshan_activation_cmds) + self.assertIsNone(re_darshan._darshan_env) + self.assertIsNone(re_darshan._darshan_runtime_root) + + re_darshan.cache_darshan_env(darshan_runtime_root='$DARSHAN_ROOT', + modules=['test_module'], + env={'TEST_VAR': 'test_value'}) + + self.assertEqual(re_darshan._darshan_runtime_root, '$DARSHAN_ROOT') + self.assertEqual(re_darshan._darshan_activation_cmds, + ['module load test_module', + 'export TEST_VAR="test_value"']) + + self.assertTrue('TEST_VAR' in re_darshan._darshan_env) + + # -------------------------------------------------------------------------- + # + def test_enable_darshan(self): + + with self.assertRaises(ValueError): + # empty or no object is provided + re_darshan.enable_darshan(pst=None) + + with self.assertRaises(TypeError): + # in case of list - only list of Pipelines is allowed + re_darshan.enable_darshan(pst=[re.Task()]) + + with self.assertRaises(TypeError): + # only PST objects are allowed + re_darshan.enable_darshan(pst='random_string') + + task = re.Task() + task.cpu_reqs = {'cpu_processes': 1} + + task.executable = '' + re_darshan.enable_darshan(task) + # Darshan is not enabled for tasks with no executable + self.assertFalse(task.executable) + + task.executable = 'test_exec' + re_darshan.enable_darshan(task) + # `test_cache_darshan_env` had already activated the env + self.assertTrue('LD_PRELOAD' in task.executable) + # non-MPI task + self.assertTrue('DARSHAN_ENABLE_NONMPI=1' in task.executable) + +# ------------------------------------------------------------------------------ + From 0cb4d9a7ecd79f38bcfb88549de272f2cc266e5b Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Wed, 28 Feb 2024 17:41:34 -0500 Subject: [PATCH 11/14] fixed linting --- tests/test_utils/test_tools.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_utils/test_tools.py b/tests/test_utils/test_tools.py index 959060d8..877fa130 100644 --- a/tests/test_utils/test_tools.py +++ b/tests/test_utils/test_tools.py @@ -1,3 +1,4 @@ +# pylint: disable=protected-access from unittest import TestCase From e9c76510b8d9a74449b7016c1afffcfbf0f339a9 Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Tue, 19 Mar 2024 13:57:41 -0400 Subject: [PATCH 12/14] added tests for Darshan --- tests/test_utils/test_tools.py | 90 +++++++++++++++++++++++++++++----- 1 file changed, 77 insertions(+), 13 deletions(-) diff --git a/tests/test_utils/test_tools.py b/tests/test_utils/test_tools.py index 877fa130..db455786 100644 --- a/tests/test_utils/test_tools.py +++ b/tests/test_utils/test_tools.py @@ -1,6 +1,12 @@ # pylint: disable=protected-access -from unittest import TestCase +import os +import shutil +import tempfile + +from unittest import TestCase, mock + +import radical.utils as ru import radical.entk as re import radical.entk.tools.darshan as re_darshan @@ -14,19 +20,19 @@ class TestTools(TestCase): # def test_cache_darshan_env(self): + re_darshan._darshan_activation_cmds = None + re_darshan._darshan_env = None + re_darshan._darshan_runtime_root = None + with self.assertRaises(RuntimeError): # should be set with either env variable or an absolute path re_darshan.cache_darshan_env(darshan_runtime_root='dir_name') - self.assertIsNone(re_darshan._darshan_activation_cmds) - self.assertIsNone(re_darshan._darshan_env) - self.assertIsNone(re_darshan._darshan_runtime_root) - - re_darshan.cache_darshan_env(darshan_runtime_root='$DARSHAN_ROOT', + re_darshan.cache_darshan_env(darshan_runtime_root='$DARSHAN_ROOT_TEST', modules=['test_module'], env={'TEST_VAR': 'test_value'}) - self.assertEqual(re_darshan._darshan_runtime_root, '$DARSHAN_ROOT') + self.assertEqual(re_darshan._darshan_runtime_root, '$DARSHAN_ROOT_TEST') self.assertEqual(re_darshan._darshan_activation_cmds, ['module load test_module', 'export TEST_VAR="test_value"']) @@ -57,12 +63,70 @@ def test_enable_darshan(self): # Darshan is not enabled for tasks with no executable self.assertFalse(task.executable) - task.executable = 'test_exec' - re_darshan.enable_darshan(task) - # `test_cache_darshan_env` had already activated the env - self.assertTrue('LD_PRELOAD' in task.executable) - # non-MPI task - self.assertTrue('DARSHAN_ENABLE_NONMPI=1' in task.executable) + # enable darshan using decorator + @re_darshan.with_darshan + def get_pipeline(): + t1 = re.Task() + t1.cpu_reqs = {'cpu_processes': 1} + t1.executable = 'test_exec1' + t2 = re.Task() + t2.cpu_reqs = {'cpu_processes': 10} + t2.executable = 'test_exec2' + s0 = re.Stage() + s0.add_tasks([t1, t2]) + p0 = re.Pipeline() + p0.add_stages(s0) + return p0 + + pipeline = get_pipeline() + for s in pipeline.stages: + for t in s.tasks: + self.assertTrue('LD_PRELOAD' in t.executable) + + non_mpi_statement = 'DARSHAN_ENABLE_NONMPI=1' in t.executable + if t.cpu_reqs.cpu_processes == 1: + # non-MPI task + self.assertTrue(non_mpi_statement) + else: + self.assertFalse(non_mpi_statement) + + # darshan is already enabled + re_darshan.enable_darshan(t) + self.assertEqual(t.executable.count('LD_PRELOAD'), 1) + + # -------------------------------------------------------------------------- + # + @mock.patch('radical.utils.sh_callout', + return_value=['1:/tmp/test_file.txt\n0:random\n', '', 0]) + def test_annotate_task_with_darshan(self, mocked_sh_callout): + + tmp_dir = tempfile.gettempdir() + log_dir = os.path.join(tmp_dir, 'darshan_logs') + ru.rec_makedir(log_dir) + + # keeping inside log_dir a random file + tempfile.mkstemp(dir=log_dir) + + task = re_darshan.enable_darshan(re.Task({ + 'executable': 'test_exec', + 'arguments' : ['> arg_output.txt'], + 'cpu_reqs' : {'cpu_processes': 1}, + 'sandbox' : tmp_dir, + 'path' : tmp_dir + })) + + re_darshan.annotate_task_with_darshan(task) + + ta = task.annotations + # file "test_file.txt" is presented in both "inputs" and "outputs", + # because the way we mocked method "sh_callout" + self.assertIn('/test_file.txt', ' '.join(ta['inputs'])) + self.assertIn('/test_file.txt', ' '.join(ta['outputs'])) + self.assertIn('/arg_output.txt', ' '.join(ta['outputs'])) + + self.assertNotIn('random', ' '.join(ta['outputs'])) + + shutil.rmtree(log_dir) # ------------------------------------------------------------------------------ From a45c613ce1e8d8403a689e409a6c8cd942db3edf Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Tue, 19 Mar 2024 14:05:31 -0400 Subject: [PATCH 13/14] fix --- src/radical/entk/tools/darshan.py | 5 +++-- tests/test_utils/test_tools.py | 37 ++++++++++++++++--------------- 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/src/radical/entk/tools/darshan.py b/src/radical/entk/tools/darshan.py index 4a5b4e06..9b8db59b 100644 --- a/src/radical/entk/tools/darshan.py +++ b/src/radical/entk/tools/darshan.py @@ -72,8 +72,9 @@ def enable_darshan(pst: Union[Pipeline, Stage, Task, List[Pipeline]], if not pst: raise ValueError('PST object is not provided') - elif isinstance(pst, list) and not isinstance(pst[0], Pipeline): - raise TypeError('List of Pipelines is not provided') + elif isinstance(pst, list): + if not isinstance(pst[0], Pipeline): + raise TypeError('List of Pipelines is not provided') elif not isinstance(pst, (Pipeline, Stage, Task)): raise TypeError('Non PST object provided') diff --git a/tests/test_utils/test_tools.py b/tests/test_utils/test_tools.py index db455786..63b86d07 100644 --- a/tests/test_utils/test_tools.py +++ b/tests/test_utils/test_tools.py @@ -65,7 +65,7 @@ def test_enable_darshan(self): # enable darshan using decorator @re_darshan.with_darshan - def get_pipeline(): + def get_pipelines(): t1 = re.Task() t1.cpu_reqs = {'cpu_processes': 1} t1.executable = 'test_exec1' @@ -76,23 +76,24 @@ def get_pipeline(): s0.add_tasks([t1, t2]) p0 = re.Pipeline() p0.add_stages(s0) - return p0 - - pipeline = get_pipeline() - for s in pipeline.stages: - for t in s.tasks: - self.assertTrue('LD_PRELOAD' in t.executable) - - non_mpi_statement = 'DARSHAN_ENABLE_NONMPI=1' in t.executable - if t.cpu_reqs.cpu_processes == 1: - # non-MPI task - self.assertTrue(non_mpi_statement) - else: - self.assertFalse(non_mpi_statement) - - # darshan is already enabled - re_darshan.enable_darshan(t) - self.assertEqual(t.executable.count('LD_PRELOAD'), 1) + return [p0] + + pipelines = get_pipelines() + for p in pipelines: + for s in p.stages: + for t in s.tasks: + self.assertTrue('LD_PRELOAD' in t.executable) + + nonmpi_statement = 'DARSHAN_ENABLE_NONMPI=1' in t.executable + if t.cpu_reqs.cpu_processes == 1: + # non-MPI task + self.assertTrue(nonmpi_statement) + else: + self.assertFalse(nonmpi_statement) + + # darshan is already enabled + re_darshan.enable_darshan(t) + self.assertEqual(t.executable.count('LD_PRELOAD'), 1) # -------------------------------------------------------------------------- # From 56c8485d14032237ae88ff61b5ca888948ee8725 Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Wed, 20 Mar 2024 19:09:30 -0400 Subject: [PATCH 14/14] response to comments --- src/radical/entk/tools/darshan.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/radical/entk/tools/darshan.py b/src/radical/entk/tools/darshan.py index 9b8db59b..be7ad757 100644 --- a/src/radical/entk/tools/darshan.py +++ b/src/radical/entk/tools/darshan.py @@ -128,6 +128,9 @@ def get_parsed_data(log: str, target_counters: Union[str, List[str]]) -> set: data = set() + if not target_counters: + return data + grep_patterns = '-e ' + ' -e '.join(ru.as_list(target_counters)) parser_cmd = (f'darshan-parser {log} | grep {grep_patterns} | ' "awk '{print $5\":\"$6}'") @@ -135,10 +138,10 @@ def get_parsed_data(log: str, target_counters: Union[str, List[str]]) -> set: if ret: print(f'[ERROR] Darshan not able to parse "{log}": {err}') else: - for o in out.split('\n'): - if not o: + for line in out.split('\n'): + if not line: continue - value, file = o.split(':') + value, file = line.split(':', 1) try: value = int(value) except ValueError: value = 0 if value > 0 and file.startswith('/'):