Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

track output files #3276

Merged
merged 3 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/06_task_output_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@

report.info('\n')
for task in tasks:
report.plain(' * %s: %s, exit: %3s, out: %s\n'
% (task.uid, task.state[:4],
task.exit_code, task.stdout.strip()[:35]))
report.plain(' * %s: %s, exit: %3s, out: %s - %s\n'
% (task.uid, task.state[:4], task.exit_code,
task.stdout.strip()[:35], task.output_files))

# delete the sample input files
report.info('\nresulting data files:\n\n')
Expand Down
10 changes: 9 additions & 1 deletion src/radical/pilot/agent/executing/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ def _create_exec_script(self, launcher, task):
ru.rec_makedir(sbox)
self._prof.prof('task_mkdir_done', uid=tid)


# the exec shell script runs the same set of commands for all ranks.
# However, if the ranks need different GPU's assigned, or if either pre-
# or post-exec directives contain per-rank dictionaries, then we switch
Expand Down Expand Up @@ -284,6 +283,10 @@ def _create_exec_script(self, launcher, task):
tmp += self._get_prof('exec_pre')
tmp += self._get_prep_exec(task, n_ranks, sig='pre_exec')

tmp += self._separator
tmp += '# output file detection (i)\n'
tmp += "ls | sort | grep -ve '^%s\\.' > %s.files\n" % (tid, tid)

tmp += self._separator
tmp += '# execute rank\n'
tmp += self._get_prof('rank_start')
Expand All @@ -292,6 +295,11 @@ def _create_exec_script(self, launcher, task):
msg='RP_EXEC_PID=$RP_EXEC_PID:'
'RP_RANK_PID=$RP_RANK_PID')

tmp += self._separator
tmp += '# output file detection (ii)\n'
tmp += 'ls | sort | comm -23 - ' \
"%s.files | grep -ve '^%s\\.' > %s.ofiles\n" % (tid, tid, tid)

tmp += self._separator
tmp += '# post-exec commands\n'
tmp += self._get_prof('exec_post')
Expand Down
11 changes: 11 additions & 0 deletions src/radical/pilot/agent/staging_output/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,17 @@ def _handle_task_stdio(self, task):
task['description']['metadata'].update(pids)

self._prof.prof('staging_uprof_stop', uid=uid)
self._prof.prof('staging_ofile_start', uid=uid)

task_ofile = '%s/%s.ofiles' % (sbox, uid)
if os.path.isfile(task_ofile):
try:
with ru.ru_open(task_ofile, 'r', errors='ignore') as fin:
task['ofiles'] = [l.strip() for l in fin.readlines()]
except Exception as e:
self._log.error("Pre/Post ofile read failed: `%s`" % e)

self._prof.prof('staging_ofile_stop', uid=uid)


# --------------------------------------------------------------------------
Expand Down
22 changes: 21 additions & 1 deletion src/radical/pilot/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def __init__(self, tmgr, descr, origin):
self._exit_code = None
self._stdout = str()
self._stderr = str()
self._ofiles = None
self._return_value = None
self._exception = None
self._exception_detail = None
Expand Down Expand Up @@ -171,7 +172,8 @@ def _update(self, task_dict, reconnect=False):
for key in ['state', 'stdout', 'stderr', 'exit_code', 'return_value',
'endpoint_fs', 'resource_sandbox', 'session_sandbox',
'pilot', 'pilot_sandbox', 'task_sandbox', 'client_sandbox',
'exception', 'exception_detail', 'slots', 'partition']:
'exception', 'exception_detail', 'slots', 'partition',
'ofiles']:

val = task_dict.get(key, None)
if val is not None:
Expand Down Expand Up @@ -322,6 +324,24 @@ def stderr(self):
return self._stderr


# --------------------------------------------------------------------------
#
@property
def output_files(self):
"""list[str]: A list of output file names.

If this property is queried before the task has reached
'DONE' or 'FAILED' state it will return None.

Warning:
This can be incomplete: the heuristics will not detect files which
start with `<task_id>.`, for example. It will also not detect files
which are not created in the task sandbox.

"""
return self._ofiles


# --------------------------------------------------------------------------
#
@property
Expand Down
Loading