Skip to content

Commit

Permalink
add DOWNLOAD staging action
Browse files Browse the repository at this point in the history
  • Loading branch information
andre-merzky committed Dec 10, 2024
1 parent 9b85293 commit 08fcad3
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 97 deletions.
11 changes: 7 additions & 4 deletions examples/05_task_input_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,16 @@
# Here we don't use dict initialization.
td = rp.TaskDescription()
td.executable = '/usr/bin/wc'
td.arguments = ['-c', 'input.dat']
td.arguments = ['-c', 'input_1.dat', 'input_2.dat']
# td.input_staging = ['input.dat']

# this is a shortcut for:
td.input_staging = {'source': 'client:///input.dat',
'target': 'task:///input.dat',
'action': rp.TRANSFER}
td.input_staging = [{'source': 'client:///input.dat',
'target': 'task:///input_1.dat',
'action': rp.TRANSFER},
{'source': 'https://1.1.1.1/',
'target': 'task:///input_2.dat',
'action': rp.DOWNLOAD}]
tds.append(td)
report.progress()
report.ok('>>ok\n')
Expand Down
68 changes: 16 additions & 52 deletions src/radical/pilot/agent/staging_input/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(self, cfg, session):
def initialize(self):

self._pwd = os.getcwd()
self._stager = rpu.StagingHelper(log=self._log)

self.register_input(rps.AGENT_STAGING_INPUT_PENDING,
rpc.AGENT_STAGING_INPUT_QUEUE, self.work)
Expand All @@ -67,7 +68,8 @@ def _work(self, tasks):
actionables = list()
for sd in task['description'].get('input_staging', []):

if sd['action'] in [rpc.LINK, rpc.COPY, rpc.MOVE, rpc.TARBALL]:
if sd['action'] in [rpc.LINK, rpc.COPY, rpc.MOVE,
rpc.TARBALL, rpc.DOWNLOAD]:
actionables.append(sd)

if actionables:
Expand Down Expand Up @@ -147,11 +149,12 @@ def _handle_task_staging(self, task, actionables):
did = sd['uid']
src = sd['source']
tgt = sd['target']
flags = sd.get('flags', 0)

self._prof.prof('staging_in_start', uid=uid, msg=did)

# agent stager only handles local actions
if action not in [rpc.COPY, rpc.LINK, rpc.MOVE]:
if action not in [rpc.COPY, rpc.LINK, rpc.MOVE, rpc.DOWNLOAD]:
self._prof.prof('staging_in_skip', uid=uid, msg=did)
continue

Expand All @@ -169,60 +172,13 @@ def _handle_task_staging(self, task, actionables):
elif os.path.exists(tgt.strip()) and os.path.isdir(tgt.strip()):
tgt = os.path.join(tgt, os.path.basename(src))


src = complete_url(src, src_context, self._log)
tgt = complete_url(tgt, tgt_context, self._log)

# Currently, we use the same schema for files and folders.
assert tgt.schema == 'file', 'staging tgt must be file://'

if action in [rpc.COPY, rpc.LINK, rpc.MOVE]:
assert src.schema == 'file', 'staging src expected as file://'

# implicitly create target dir if needed - but only for local ops
if action != rpc.TRANSFER:
tgtdir = os.path.dirname(tgt.path)
if tgtdir != task_sandbox.path:
self._log.debug("mkdir %s", tgtdir)
ru.rec_makedir(tgtdir)

if action == rpc.COPY:
try:
shutil.copytree(src.path, tgt.path)
except OSError as exc:
if exc.errno == errno.ENOTDIR:
shutil.copy(src.path, tgt.path)
else:
raise

elif action == rpc.LINK:

# Fix issue/1513 if link source is file and target is folder.
# should support POSIX standard where link is created
# with the same name as the source
if os.path.isfile(src.path) and os.path.isdir(tgt.path):
os.symlink(src.path,
'%s/%s' % (tgt.path, os.path.basename(src.path)))

else:
os.symlink(src.path, tgt.path)

elif action == rpc.MOVE:
shutil.move(src.path, tgt.path)

elif action == rpc.TRANSFER:

# NOTE: TRANSFER directives don't arrive here right now.
# FIXME: we only handle srm staging right now, and only for
# a specific target proxy. Other TRANSFER directives are
# left to tmgr input staging. We should use SAGA to
# attempt all staging ops which do not involve the client
# machine.
self._log.error('no transfer for %s -> %s', src, tgt)
self._prof.prof('staging_in_fail', uid=uid, msg=did)
raise NotImplementedError('unsupported transfer %s' % src)

elif action == rpc.TARBALL:
assert tgt.schema == 'file', 'staging tgt expected as file://'

if action == rpc.TARBALL:

# If somethig was staged via the tarball method, the tarball is
# extracted and then removed from the task folder. The target
Expand All @@ -238,6 +194,14 @@ def _handle_task_staging(self, task, actionables):
# FIXME: make tarball removal dependent on debug settings
# os.remove(os.path.dirname(tgt.path) + '/' + uid + '.tar')

else:

self._stager.handle_staging_directive({'source': src,
'target': tgt,
'action': action,
'flags' : flags})


self._prof.prof('staging_in_stop', uid=uid, msg=did)

# all staging is done -- pass on to the scheduler
Expand Down
36 changes: 6 additions & 30 deletions src/radical/pilot/agent/staging_output/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self, cfg, session):
def initialize(self):

self._pwd = os.getcwd()
self._stager = rpu.StagingHelper(log=self._log)

self.register_input(rps.AGENT_STAGING_OUTPUT_PENDING,
rpc.AGENT_STAGING_OUTPUT_QUEUE, self.work)
Expand Down Expand Up @@ -311,37 +312,12 @@ def _handle_task_staging(self, task, actionables):
if action in [rpc.COPY, rpc.LINK, rpc.MOVE]:
assert tgt.schema == 'file', 'staging tgt expected as file://'

# implicitly create target dir if needed - but only for local ops
if action != rpc.TRANSFER:
tgtdir = os.path.dirname(tgt.path)
if tgtdir != task_sandbox.path:
self._log.debug("mkdir %s", tgtdir)
ru.rec_makedir(tgtdir)
self._stager.handle_staging_directive({'source': src,
'target': tgt,
'action': action,
'flags' : flags})


if action == rpc.COPY:
try:
shutil.copytree(src.path, tgt.path)
except OSError as exc:
if exc.errno == errno.ENOTDIR:
shutil.copy(src.path, tgt.path)
else:
raise

elif action == rpc.LINK:
# Fix issue/1513 if link source is file and target is folder
# should support POSIX standard where link is created
# with the same name as the source
if os.path.isfile(src.path) and os.path.isdir(tgt.path):
os.symlink(src.path,
os.path.join(tgt.path,
os.path.basename(src.path)))
else: # default behavior
os.symlink(src.path, tgt.path)
elif action == rpc.MOVE: shutil.move(src.path, tgt.path)
elif action == rpc.TRANSFER: pass
# This is currently never executed. Commenting it out.
# Uncomment and implement when uploads directly to remote URLs
# from tasks are supported.
self._prof.prof('staging_out_stop', uid=uid, msg=did)

# all agent staging is done -- pass on to tmgr output staging
Expand Down
1 change: 1 addition & 0 deletions src/radical/pilot/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
COPY = 'Copy' # local cp
LINK = 'Link' # local ln -s
MOVE = 'Move' # local mv
DOWNLOAD = 'Download' # remote download by agent
TRANSFER = 'Transfer' # remote transfer from / to client
TARBALL = 'Tarball' # remote staging will be executed using a tarball.

Expand Down
3 changes: 2 additions & 1 deletion src/radical/pilot/task_description.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,10 +561,11 @@ class TaskDescription(ru.TypedDict):
*Action operators*
- rp.TRANSFER : remote file transfer from `source` URL to `target` URL
- rp.TRANSFER : remote file transfer from `source` to `target` URL (client)
- rp.COPY : local file copy, i.e., not crossing host boundaries
- rp.MOVE : local file move
- rp.LINK : local file symlink
- rp.DOWNLOAD : fetch remote file from `source` URL to `target` URL (agent)
*Flags*
Expand Down
7 changes: 0 additions & 7 deletions src/radical/pilot/tmgr/staging_input/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,17 +318,10 @@ def _handle_task(self, task, actionables):
new_actionables.append(sd)

else:

action = sd['action']
did = sd['uid']
src = sd['source']
tgt = sd['target']

# client stager only handles remote actions
if action not in [rpc.TRANSFER, rpc.TARBALL]:
self._prof.prof('staging_in_skip', uid=uid, msg=did)
continue

src = complete_url(src, src_context, self._log)
tgt = complete_url(tgt, tgt_context, self._log)

Expand Down
22 changes: 19 additions & 3 deletions src/radical/pilot/utils/staging_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import radical.utils as ru

from ..constants import COPY, LINK, MOVE, TRANSFER
from ..constants import COPY, LINK, MOVE, TRANSFER, DOWNLOAD
from ..constants import TARBALL # , CREATE_PARENTS, RECURSIVE


Expand Down Expand Up @@ -42,6 +42,10 @@ def link(self, src, tgt, flags=None):
self._log.debug('link %s %s', src, tgt)
self._backend.link(src, tgt, flags)

def download(self, src, tgt, flags=None):
self._log.debug('download %s %s', src, tgt)
self._backend.download(src, tgt, flags)

def delete(self, tgt, flags=None):
self._log.debug('rm %s', tgt)
self._backend.delete(tgt, flags)
Expand All @@ -55,10 +59,9 @@ def handle_staging_directive(self, sd):
action = sd['action']
src = sd['source']
tgt = sd['target']
uid = sd.get('uid', '')
flags = sd.get('flags', 0)

assert action in [COPY, LINK, MOVE, TRANSFER]
assert action in [COPY, LINK, MOVE, TRANSFER, DOWNLOAD]

self._log.info('%-10s %s', action, src)
self._log.info('%-10s %s', '', tgt)
Expand All @@ -72,6 +75,9 @@ def handle_staging_directive(self, sd):
elif action == MOVE:
self.move(src, tgt, flags)

elif action in [DOWNLOAD]:
self.download(src, tgt, flags)


# ------------------------------------------------------------------------------
#
Expand Down Expand Up @@ -107,6 +113,11 @@ def link(self, src, tgt, flags):
self.mkdir(os.path.dirname(tgt), flags)
os.link(src, tgt)

def download(self, src, tgt, flags):
tgt = ru.Url(tgt).path
self.mkdir(os.path.dirname(tgt), flags)
ru.sh_callout('wget -r %s -O %s' % (src, tgt))

def delete(self, tgt, flags):
tgt = ru.Url(tgt).path
try : os.unlink(tgt)
Expand Down Expand Up @@ -170,6 +181,11 @@ def move(self, src, tgt, flags):
def link(self, src, tgt, flags):
assert self._has_saga

def download(self, src, tgt, flags):
assert self._has_saga

self.copy(src, tgt, flags)


def delete(self, tgt, flags):
assert self._has_saga
Expand Down

0 comments on commit 08fcad3

Please sign in to comment.