Skip to content

Commit

Permalink
merge from devel
Browse files Browse the repository at this point in the history
  • Loading branch information
andre-merzky committed Dec 10, 2024
2 parents 7633a96 + 9b85293 commit d46ae97
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 5 deletions.
6 changes: 3 additions & 3 deletions examples/06_task_output_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@

report.info('\n')
for task in tasks:
report.plain(' * %s: %s, exit: %3s, out: %s\n'
% (task.uid, task.state[:4],
task.exit_code, task.stdout.strip()[:35]))
report.plain(' * %s: %s, exit: %3s, out: %s - %s\n'
% (task.uid, task.state[:4], task.exit_code,
task.stdout.strip()[:35], task.output_files))

# delete the sample input files
report.info('\nresulting data files:\n\n')
Expand Down
10 changes: 9 additions & 1 deletion src/radical/pilot/agent/executing/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ def _create_exec_script(self, launcher, task):
ru.rec_makedir(sbox)
self._prof.prof('task_mkdir_done', uid=tid)


# the exec shell script runs the same set of commands for all ranks.
# However, if the ranks need different GPU's assigned, or if either pre-
# or post-exec directives contain per-rank dictionaries, then we switch
Expand Down Expand Up @@ -284,6 +283,10 @@ def _create_exec_script(self, launcher, task):
tmp += self._get_prof('exec_pre')
tmp += self._get_prep_exec(task, n_ranks, sig='pre_exec')

tmp += self._separator
tmp += '# output file detection (i)\n'
tmp += "ls | sort | grep -ve '^%s\\.' > %s.files\n" % (tid, tid)

tmp += self._separator
tmp += '# execute rank\n'
tmp += self._get_prof('rank_start')
Expand All @@ -292,6 +295,11 @@ def _create_exec_script(self, launcher, task):
msg='RP_EXEC_PID=$RP_EXEC_PID:'
'RP_RANK_PID=$RP_RANK_PID')

tmp += self._separator
tmp += '# output file detection (ii)\n'
tmp += 'ls | sort | comm -23 - ' \
"%s.files | grep -ve '^%s\\.' > %s.ofiles\n" % (tid, tid, tid)

tmp += self._separator
tmp += '# post-exec commands\n'
tmp += self._get_prof('exec_post')
Expand Down
11 changes: 11 additions & 0 deletions src/radical/pilot/agent/staging_output/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,17 @@ def _handle_task_stdio(self, task):
task['description']['metadata'].update(pids)

self._prof.prof('staging_uprof_stop', uid=uid)
self._prof.prof('staging_ofile_start', uid=uid)

task_ofile = '%s/%s.ofiles' % (sbox, uid)
if os.path.isfile(task_ofile):
try:
with ru.ru_open(task_ofile, 'r', errors='ignore') as fin:
task['ofiles'] = [l.strip() for l in fin.readlines()]
except Exception as e:
self._log.error("Pre/Post ofile read failed: `%s`" % e)

self._prof.prof('staging_ofile_stop', uid=uid)


# --------------------------------------------------------------------------
Expand Down
22 changes: 21 additions & 1 deletion src/radical/pilot/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def __init__(self, tmgr, descr, origin):
self._exit_code = None
self._stdout = str()
self._stderr = str()
self._ofiles = None
self._return_value = None
self._exception = None
self._exception_detail = None
Expand Down Expand Up @@ -171,7 +172,8 @@ def _update(self, task_dict, reconnect=False):
for key in ['state', 'stdout', 'stderr', 'exit_code', 'return_value',
'endpoint_fs', 'resource_sandbox', 'session_sandbox',
'pilot', 'pilot_sandbox', 'task_sandbox', 'client_sandbox',
'exception', 'exception_detail', 'slots', 'partition']:
'exception', 'exception_detail', 'slots', 'partition',
'ofiles']:

val = task_dict.get(key, None)
if val is not None:
Expand Down Expand Up @@ -322,6 +324,24 @@ def stderr(self):
return self._stderr


# --------------------------------------------------------------------------
#
@property
def output_files(self):
"""list[str]: A list of output file names.
If this property is queried before the task has reached
'DONE' or 'FAILED' state it will return None.
Warning:
This can be incomplete: the heuristics will not detect files which
start with `<task_id>.`, for example. It will also not detect files
which are not created in the task sandbox.
"""
return self._ofiles


# --------------------------------------------------------------------------
#
@property
Expand Down

0 comments on commit d46ae97

Please sign in to comment.