Skip to content

Commit

Permalink
Fix the sync of files for the firecrest scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
ekouts committed Nov 14, 2023
1 parent 88c5095 commit 2cf0ac4
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 27 deletions.
15 changes: 8 additions & 7 deletions reframe/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1665,7 +1665,7 @@ def _setup_paths(self):
except OSError as e:
raise PipelineError('failed to set up paths') from e

def _create_job(self, job_type, force_local=False, **job_opts):
def _create_job(self, job_type, force_local=False, clean_up_stage=False, **job_opts):
'''Setup the job related to this check.'''

if force_local:
Expand All @@ -1692,14 +1692,15 @@ def _create_job(self, job_type, force_local=False, **job_opts):
script_filename=script_name,
workdir=self._stagedir,
sched_access=self._current_partition.access,
clean_up_stage=clean_up_stage,
**job_opts)

def _setup_build_job(self, **job_opts):
def _setup_build_job(self, clean_up_stage=False, **job_opts):
self._build_job = self._create_job(
'build', self.local or self.build_locally, **job_opts
'build', self.local or self.build_locally, clean_up_stage, **job_opts
)

def _setup_run_job(self, **job_opts):
def _setup_run_job(self, clean_up_stage=False, **job_opts):
self._job = self._create_job(f'run', self.local, **job_opts)

def _setup_container_platform(self):
Expand Down Expand Up @@ -1743,7 +1744,7 @@ def setup(self, partition, environ, **job_opts):
self._current_partition = partition
self._current_environ = environ
self._setup_paths()
self._setup_build_job(**job_opts)
self._setup_build_job(clean_up_stage=True, **job_opts)
self._setup_run_job(**job_opts)
self._setup_container_platform()
self._resolve_fixtures()
Expand Down Expand Up @@ -2559,7 +2560,7 @@ def setup(self, partition, environ, **job_opts):
self._current_partition = partition
self._current_environ = environ
self._setup_paths()
self._setup_run_job(**job_opts)
self._setup_run_job(clean_up_stage=True, **job_opts)
self._setup_container_platform()
self._resolve_fixtures()

Expand Down Expand Up @@ -2616,7 +2617,7 @@ def setup(self, partition, environ, **job_opts):
self._current_partition = partition
self._current_environ = environ
self._setup_paths()
self._setup_build_job(**job_opts)
self._setup_build_job(clean_up_stage=True, **job_opts)
self._setup_container_platform()
self._resolve_fixtures()

Expand Down
5 changes: 4 additions & 1 deletion reframe/core/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,8 @@ def __init__(self,
stderr=None,
sched_flex_alloc_nodes=None,
sched_access=[],
sched_options=None):
sched_options=None,
clean_up_stage=False):

self._cli_options = list(sched_options) if sched_options else []
self._name = name
Expand Down Expand Up @@ -354,6 +355,8 @@ def __init__(self,
# in finished()
self._exception = None

self._clean_up_stage = clean_up_stage

@classmethod
def create(cls, scheduler, launcher, *args, **kwargs):
ret = scheduler.make_job(*args, **kwargs)
Expand Down
31 changes: 12 additions & 19 deletions reframe/core/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,6 @@ def __init__(self, *args, **kwargs):
self._remotedir = None
self._localdir = None

self._local_filetimestamps = {}
self._remote_filetimestamps = {}
self._first_submission = True

# The compacted nodelist as reported by Slurm. This must be updated in
# every poll as Slurm may be slow in reporting the exact nodelist
Expand Down Expand Up @@ -745,6 +742,9 @@ def __init__(self, *args, **kwargs):
)
self._system_name = 'daint'

self._local_filetimestamps = {}
self._remote_filetimestamps = {}

def make_job(self, *args, **kwargs):
return _SlurmFirecrestJob(*args, **kwargs)

Expand All @@ -759,9 +759,8 @@ def _push_artefacts(self, job):
for f in filenames:
local_norm_path = join_and_normalize(job._localdir, dirpath, f)
modification_time = os.path.getmtime(local_norm_path)
if job._local_filetimestamps.get(local_norm_path) != modification_time:
job._local_filetimestamps[local_norm_path] = modification_time

if self._local_filetimestamps.get(local_norm_path) != modification_time:
self._local_filetimestamps[local_norm_path] = modification_time
self.log(f'Uploading file {f} in {join_and_normalize(job._remotedir, dirpath)}')
self.client.simple_upload(
self._system_name,
Expand All @@ -777,9 +776,7 @@ def _push_artefacts(self, job):
)
for f in remote_files:
local_norm_path = join_and_normalize(remote_dir_path, f['name'])
job._remote_filetimestamps[local_norm_path] = f['last_modified']


self._remote_filetimestamps[local_norm_path] = f['last_modified']

def _pull_artefacts(self, job):
def firecrest_walk(directory):
Expand Down Expand Up @@ -816,18 +813,17 @@ def firecrest_walk(directory):
for (f, modification_time) in files:
norm_path = join_and_normalize(dirpath, f)
local_file_path = join_and_normalize(local_dirpath, f)
if job._remote_filetimestamps.get(norm_path) != modification_time:
if self._remote_filetimestamps.get(norm_path) != modification_time:
self.log(f'Downloading file {f} in {local_dirpath}')
self.client.simple_download(
self._system_name,
norm_path,
local_file_path
)

job._remote_filetimestamps[norm_path] = modification_time

job._local_filetimestamps[local_file_path] = os.path.getmtime(local_file_path)
self._remote_filetimestamps[norm_path] = modification_time

self._local_filetimestamps[local_file_path] = os.path.getmtime(local_file_path)

def submit(self, job):
job._localdir = os.getcwd()
Expand All @@ -836,7 +832,7 @@ def submit(self, job):
os.path.relpath(os.getcwd(), job._stage_prefix)
)

if job._first_submission:
if job._clean_up_stage:
# Create clean stage directory in the remote system
try:
self.client.simple_delete(self._system_name, job._remotedir)
Expand All @@ -845,9 +841,9 @@ def submit(self, job):
pass

self.client.mkdir(self._system_name, job._remotedir, p=True)
job._first_submission = False
self.log(f'Creating remote directory {job._remotedir} in {self._system_name}')

self._push_artefacts(job)
self._push_artefacts(job)

intervals = itertools.cycle([1, 2, 3])
while True:
Expand Down Expand Up @@ -922,9 +918,6 @@ def poll(self, *jobs):

# Use ',' to join nodes to be consistent with Slurm syntax
job._nodespec = ','.join(m['nodelist'] for m in jobarr_info)
# self._update_completion_time(
# job, (m.group('end') for m in jobarr_info)
# )

def wait(self, job):
# Quickly return in case we have finished already
Expand Down

0 comments on commit 2cf0ac4

Please sign in to comment.