Skip to content

Commit

Permalink
CalcJob: allow wildcards in stash.source_list paths (#5601)
Browse files Browse the repository at this point in the history
  • Loading branch information
sphuber authored Jul 22, 2022
1 parent c0fdf38 commit e141f97
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 15 deletions.
11 changes: 9 additions & 2 deletions .github/system_tests/test_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# pylint: disable=no-name-in-module
"""Tests to run with a running daemon."""
import os
import re
import shutil
import subprocess
import sys
Expand Down Expand Up @@ -434,7 +435,7 @@ def launch_all():
# Delete the temporary directory to test that the stashing functionality will create it if necessary
shutil.rmtree(tmpdir, ignore_errors=True)

source_list = ['output.txt', 'triple_value.tmp']
source_list = ['output.txt', 'triple_value.*']
inputs['metadata']['options']['stash'] = {'target_base': tmpdir, 'source_list': source_list}
_, node = run.get_node(process, **inputs)
assert node.is_finished_ok
Expand All @@ -443,7 +444,13 @@ def launch_all():
assert remote_stash.stash_mode == StashMode.COPY
assert remote_stash.target_basepath.startswith(tmpdir)
assert sorted(remote_stash.source_list) == sorted(source_list)
assert sorted(p for p in os.listdir(remote_stash.target_basepath)) == sorted(source_list)

stashed_filenames = os.listdir(remote_stash.target_basepath)
for filename in source_list:
if '*' in filename:
assert any(re.match(filename, stashed_file) is not None for stashed_file in stashed_filenames)
else:
assert filename in stashed_filenames

# Submitting the calcfunction through the launchers
print('Submitting calcfunction to the daemon')
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ on:
paths:
- '.github/workflows/nightly.yml'
- '.github/workflows/setup.sh'
- '.github/system_tests/test_daemon.py'
- '.molecule/default/files/**'
- 'aiida/storage/psql_dos/migrations/**'
- 'tests/storage/psql_dos/migrations/**'
Expand Down
33 changes: 20 additions & 13 deletions aiida/engine/daemon/execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,27 +408,34 @@ def stash_calculation(calculation: CalcJobNode, transport: Transport) -> None:
EXEC_LOGGER.debug(f'stashing files for calculation<{calculation.pk}>: {source_list}', extra=logger_extra)

uuid = calculation.uuid
target_basepath = os.path.join(stash_options['target_base'], uuid[:2], uuid[2:4], uuid[4:])
source_basepath = pathlib.Path(calculation.get_remote_workdir())
target_basepath = pathlib.Path(stash_options['target_base']) / uuid[:2] / uuid[2:4] / uuid[4:]

for source_filename in source_list:

source_filepath = os.path.join(calculation.get_remote_workdir(), source_filename)
target_filepath = os.path.join(target_basepath, source_filename)
if transport.has_magic(source_filename):
copy_instructions = []
for globbed_filename in transport.glob(str(source_basepath / source_filename)):
target_filepath = target_basepath / pathlib.Path(globbed_filename).relative_to(source_basepath)
copy_instructions.append((globbed_filename, target_filepath))
else:
copy_instructions = [(source_basepath / source_filename, target_basepath / source_filename)]

# If the source file is in a (nested) directory, create those directories first in the target directory
target_dirname = os.path.dirname(target_filepath)
transport.makedirs(target_dirname, ignore_existing=True)
for source_filepath, target_filepath in copy_instructions:
# If the source file is in a (nested) directory, create those directories first in the target directory
target_dirname = target_filepath.parent
transport.makedirs(str(target_dirname), ignore_existing=True)

try:
transport.copy(source_filepath, target_filepath)
except (IOError, ValueError) as exception:
EXEC_LOGGER.warning(f'failed to stash {source_filepath} to {target_filepath}: {exception}')
else:
EXEC_LOGGER.debug(f'stashed {source_filepath} to {target_filepath}')
try:
transport.copy(str(source_filepath), str(target_filepath))
except (IOError, ValueError) as exception:
EXEC_LOGGER.warning(f'failed to stash {source_filepath} to {target_filepath}: {exception}')
else:
EXEC_LOGGER.debug(f'stashed {source_filepath} to {target_filepath}')

remote_stash = cls(
computer=calculation.computer,
target_basepath=target_basepath,
target_basepath=str(target_basepath),
stash_mode=StashMode(stash_mode),
source_list=source_list,
).store()
Expand Down

0 comments on commit e141f97

Please sign in to comment.