diff --git a/examples/05_task_input_data.py b/examples/05_task_input_data.py index 2496e823d..5a1d0bec3 100755 --- a/examples/05_task_input_data.py +++ b/examples/05_task_input_data.py @@ -91,13 +91,16 @@ # Here we don't use dict initialization. td = rp.TaskDescription() td.executable = '/usr/bin/wc' - td.arguments = ['-c', 'input.dat'] + td.arguments = ['-c', 'input_1.dat', 'input_2.dat'] # td.input_staging = ['input.dat'] # this is a shortcut for: - td.input_staging = {'source': 'client:///input.dat', - 'target': 'task:///input.dat', - 'action': rp.TRANSFER} + td.input_staging = [{'source': 'client:///input.dat', + 'target': 'task:///input_1.dat', + 'action': rp.TRANSFER}, + {'source': 'https://1.1.1.1/', + 'target': 'task:///input_2.dat', + 'action': rp.DOWNLOAD}] tds.append(td) report.progress() report.ok('>>ok\n') diff --git a/src/radical/pilot/agent/staging_input/default.py b/src/radical/pilot/agent/staging_input/default.py index 3e0cea009..281118a24 100644 --- a/src/radical/pilot/agent/staging_input/default.py +++ b/src/radical/pilot/agent/staging_input/default.py @@ -41,6 +41,7 @@ def __init__(self, cfg, session): def initialize(self): self._pwd = os.getcwd() + self._stager = rpu.StagingHelper(log=self._log) self.register_input(rps.AGENT_STAGING_INPUT_PENDING, rpc.AGENT_STAGING_INPUT_QUEUE, self.work) @@ -67,7 +68,8 @@ def _work(self, tasks): actionables = list() for sd in task['description'].get('input_staging', []): - if sd['action'] in [rpc.LINK, rpc.COPY, rpc.MOVE, rpc.TARBALL]: + if sd['action'] in [rpc.LINK, rpc.COPY, rpc.MOVE, + rpc.TARBALL, rpc.DOWNLOAD]: actionables.append(sd) if actionables: @@ -147,11 +149,12 @@ def _handle_task_staging(self, task, actionables): did = sd['uid'] src = sd['source'] tgt = sd['target'] + flags = sd.get('flags', 0) self._prof.prof('staging_in_start', uid=uid, msg=did) # agent stager only handles local actions - if action not in [rpc.COPY, rpc.LINK, rpc.MOVE]: + if action not in [rpc.COPY, rpc.LINK, rpc.MOVE, rpc.DOWNLOAD]: self._prof.prof('staging_in_skip', uid=uid, msg=did) continue @@ -169,60 +172,13 @@ def _handle_task_staging(self, task, actionables): elif os.path.exists(tgt.strip()) and os.path.isdir(tgt.strip()): tgt = os.path.join(tgt, os.path.basename(src)) - src = complete_url(src, src_context, self._log) tgt = complete_url(tgt, tgt_context, self._log) - # Currently, we use the same schema for files and folders. - assert tgt.schema == 'file', 'staging tgt must be file://' - if action in [rpc.COPY, rpc.LINK, rpc.MOVE]: - assert src.schema == 'file', 'staging src expected as file://' - - # implicitly create target dir if needed - but only for local ops - if action != rpc.TRANSFER: - tgtdir = os.path.dirname(tgt.path) - if tgtdir != task_sandbox.path: - self._log.debug("mkdir %s", tgtdir) - ru.rec_makedir(tgtdir) - - if action == rpc.COPY: - try: - shutil.copytree(src.path, tgt.path) - except OSError as exc: - if exc.errno == errno.ENOTDIR: - shutil.copy(src.path, tgt.path) - else: - raise - - elif action == rpc.LINK: - - # Fix issue/1513 if link source is file and target is folder. - # should support POSIX standard where link is created - # with the same name as the source - if os.path.isfile(src.path) and os.path.isdir(tgt.path): - os.symlink(src.path, - '%s/%s' % (tgt.path, os.path.basename(src.path))) - - else: - os.symlink(src.path, tgt.path) - - elif action == rpc.MOVE: - shutil.move(src.path, tgt.path) - - elif action == rpc.TRANSFER: - - # NOTE: TRANSFER directives don't arrive here right now. - # FIXME: we only handle srm staging right now, and only for - # a specific target proxy. Other TRANSFER directives are - # left to tmgr input staging. We should use SAGA to - # attempt all staging ops which do not involve the client - # machine. - self._log.error('no transfer for %s -> %s', src, tgt) - self._prof.prof('staging_in_fail', uid=uid, msg=did) - raise NotImplementedError('unsupported transfer %s' % src) - - elif action == rpc.TARBALL: + assert tgt.schema == 'file', 'staging tgt expected as file://' + + if action == rpc.TARBALL: # If somethig was staged via the tarball method, the tarball is # extracted and then removed from the task folder. The target @@ -238,6 +194,14 @@ def _handle_task_staging(self, task, actionables): # FIXME: make tarball removal dependent on debug settings # os.remove(os.path.dirname(tgt.path) + '/' + uid + '.tar') + else: + + self._stager.handle_staging_directive({'source': src, + 'target': tgt, + 'action': action, + 'flags' : flags}) + + self._prof.prof('staging_in_stop', uid=uid, msg=did) # all staging is done -- pass on to the scheduler diff --git a/src/radical/pilot/agent/staging_output/default.py b/src/radical/pilot/agent/staging_output/default.py index 2706dd0b7..e542e0b10 100644 --- a/src/radical/pilot/agent/staging_output/default.py +++ b/src/radical/pilot/agent/staging_output/default.py @@ -45,6 +45,7 @@ def __init__(self, cfg, session): def initialize(self): self._pwd = os.getcwd() + self._stager = rpu.StagingHelper(log=self._log) self.register_input(rps.AGENT_STAGING_OUTPUT_PENDING, rpc.AGENT_STAGING_OUTPUT_QUEUE, self.work) @@ -311,37 +312,12 @@ def _handle_task_staging(self, task, actionables): if action in [rpc.COPY, rpc.LINK, rpc.MOVE]: assert tgt.schema == 'file', 'staging tgt expected as file://' - # implicitly create target dir if needed - but only for local ops - if action != rpc.TRANSFER: - tgtdir = os.path.dirname(tgt.path) - if tgtdir != task_sandbox.path: - self._log.debug("mkdir %s", tgtdir) - ru.rec_makedir(tgtdir) + self._stager.handle_staging_directive({'source': src, + 'target': tgt, + 'action': action, + 'flags' : flags}) + - if action == rpc.COPY: - try: - shutil.copytree(src.path, tgt.path) - except OSError as exc: - if exc.errno == errno.ENOTDIR: - shutil.copy(src.path, tgt.path) - else: - raise - - elif action == rpc.LINK: - # Fix issue/1513 if link source is file and target is folder - # should support POSIX standard where link is created - # with the same name as the source - if os.path.isfile(src.path) and os.path.isdir(tgt.path): - os.symlink(src.path, - os.path.join(tgt.path, - os.path.basename(src.path))) - else: # default behavior - os.symlink(src.path, tgt.path) - elif action == rpc.MOVE: shutil.move(src.path, tgt.path) - elif action == rpc.TRANSFER: pass - # This is currently never executed. Commenting it out. - # Uncomment and implement when uploads directly to remote URLs - # from tasks are supported. self._prof.prof('staging_out_stop', uid=uid, msg=did) # all agent staging is done -- pass on to tmgr output staging diff --git a/src/radical/pilot/constants.py b/src/radical/pilot/constants.py index d448520e2..0a6b64d6b 100644 --- a/src/radical/pilot/constants.py +++ b/src/radical/pilot/constants.py @@ -82,6 +82,7 @@ COPY = 'Copy' # local cp LINK = 'Link' # local ln -s MOVE = 'Move' # local mv +DOWNLOAD = 'Download' # remote download by agent TRANSFER = 'Transfer' # remote transfer from / to client TARBALL = 'Tarball' # remote staging will be executed using a tarball. diff --git a/src/radical/pilot/task_description.py b/src/radical/pilot/task_description.py index 57a8bbf2a..7b9d84ae1 100644 --- a/src/radical/pilot/task_description.py +++ b/src/radical/pilot/task_description.py @@ -561,10 +561,11 @@ class TaskDescription(ru.TypedDict): *Action operators* - - rp.TRANSFER : remote file transfer from `source` URL to `target` URL + - rp.TRANSFER : remote file transfer from `source` to `target` URL (client) - rp.COPY : local file copy, i.e., not crossing host boundaries - rp.MOVE : local file move - rp.LINK : local file symlink + - rp.DOWNLOAD : fetch remote file from `source` URL to `target` URL (agent) *Flags* diff --git a/src/radical/pilot/tmgr/staging_input/default.py b/src/radical/pilot/tmgr/staging_input/default.py index a9f559878..32088d3c6 100644 --- a/src/radical/pilot/tmgr/staging_input/default.py +++ b/src/radical/pilot/tmgr/staging_input/default.py @@ -318,17 +318,10 @@ def _handle_task(self, task, actionables): new_actionables.append(sd) else: - - action = sd['action'] did = sd['uid'] src = sd['source'] tgt = sd['target'] - # client stager only handles remote actions - if action not in [rpc.TRANSFER, rpc.TARBALL]: - self._prof.prof('staging_in_skip', uid=uid, msg=did) - continue - src = complete_url(src, src_context, self._log) tgt = complete_url(tgt, tgt_context, self._log) diff --git a/src/radical/pilot/utils/staging_helper.py b/src/radical/pilot/utils/staging_helper.py index 5773c9962..841a6112d 100644 --- a/src/radical/pilot/utils/staging_helper.py +++ b/src/radical/pilot/utils/staging_helper.py @@ -4,7 +4,7 @@ import radical.utils as ru -from ..constants import COPY, LINK, MOVE, TRANSFER +from ..constants import COPY, LINK, MOVE, TRANSFER, DOWNLOAD from ..constants import TARBALL # , CREATE_PARENTS, RECURSIVE @@ -42,6 +42,10 @@ def link(self, src, tgt, flags=None): self._log.debug('link %s %s', src, tgt) self._backend.link(src, tgt, flags) + def download(self, src, tgt, flags=None): + self._log.debug('download %s %s', src, tgt) + self._backend.download(src, tgt, flags) + def delete(self, tgt, flags=None): self._log.debug('rm %s', tgt) self._backend.delete(tgt, flags) @@ -55,10 +59,9 @@ def handle_staging_directive(self, sd): action = sd['action'] src = sd['source'] tgt = sd['target'] - uid = sd.get('uid', '') flags = sd.get('flags', 0) - assert action in [COPY, LINK, MOVE, TRANSFER] + assert action in [COPY, LINK, MOVE, TRANSFER, DOWNLOAD] self._log.info('%-10s %s', action, src) self._log.info('%-10s %s', '', tgt) @@ -72,6 +75,9 @@ def handle_staging_directive(self, sd): elif action == MOVE: self.move(src, tgt, flags) + elif action in [DOWNLOAD]: + self.download(src, tgt, flags) + # ------------------------------------------------------------------------------ # @@ -107,6 +113,11 @@ def link(self, src, tgt, flags): self.mkdir(os.path.dirname(tgt), flags) os.link(src, tgt) + def download(self, src, tgt, flags): + tgt = ru.Url(tgt).path + self.mkdir(os.path.dirname(tgt), flags) + ru.sh_callout('wget -r %s -O %s' % (src, tgt)) + def delete(self, tgt, flags): tgt = ru.Url(tgt).path try : os.unlink(tgt) @@ -170,6 +181,11 @@ def move(self, src, tgt, flags): def link(self, src, tgt, flags): assert self._has_saga + def download(self, src, tgt, flags): + assert self._has_saga + + self.copy(src, tgt, flags) + def delete(self, tgt, flags): assert self._has_saga