Skip to content

Commit

Permalink
Merge branch 'v3.1-rc' of github.com:C2SM/processing-chain into chemi…
Browse files Browse the repository at this point in the history
…cal_renaming
  • Loading branch information
mjaehn committed Feb 7, 2024
2 parents 483193b + 008bf17 commit c32d633
Show file tree
Hide file tree
Showing 12 changed files with 407 additions and 89 deletions.
2 changes: 1 addition & 1 deletion cases/cosmo-ghg-spinup-test/cosmo_runjob.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ echo "============== StartTime: `date +%s` s"
echo "============== StartTime: `date`"
echo "====================================================="

srun -u ./{execname} >> {logfile} 2>&1
srun -u ./{cfg.cosmo_execname} >> {logfile} 2>&1
pid=$?

echo "====================================================="
Expand Down
2 changes: 1 addition & 1 deletion cases/cosmo-ghg-test/cosmo_runjob.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ echo "============== StartTime: `date +%s` s"
echo "============== StartTime: `date`"
echo "====================================================="

srun -u ./{execname} >> {logfile} 2>&1
srun -u ./{cfg.cosmo_execname} >> {logfile} 2>&1
pid=$?

echo "====================================================="
Expand Down
2 changes: 1 addition & 1 deletion cases/icon-art-global-test/icon_runjob.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -396,4 +396,4 @@ handle_error(){{
exit 1
fi
}}
srun ./icon.exe || handle_error
srun ./{cfg.icon_execname} || handle_error
2 changes: 1 addition & 1 deletion cases/icon-art-oem-test/icon_runjob.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -376,4 +376,4 @@ handle_error(){{
exit 1
fi
}}
srun ./icon.exe || handle_error
srun ./{cfg.icon_execname} || handle_error
2 changes: 1 addition & 1 deletion cases/icon-test/icon_runjob.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -342,4 +342,4 @@ EOF
# ----------------------------------------------------------------------
# run the model!
# ----------------------------------------------------------------------
srun ./icon.exe
srun ./{cfg.icon_execname} || handle_error
74 changes: 27 additions & 47 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,36 +319,18 @@ def create_vars_from_dicts(self, dct=None, key=None):
else:
setattr(self, subkey, v)

def format_duration(self, duration):
"""
Format a duration represented by a datetime.timedelta object into a human-readable string.
Parameters:
- duration (datetime.timedelta): The duration to be formatted.
Returns:
- str: A string representing the formatted duration in the "0d 0h 0m 0s" format.
"""
seconds = duration.total_seconds()
days, remainder = divmod(seconds, 86400)
hours, remainder = divmod(remainder, 3600)
minutes, seconds = divmod(remainder, 60)

formatted_duration = f"{int(days)}d {int(hours)}h {int(minutes)}m {int(seconds)}s"
return formatted_duration

def get_chunk_list(self):
self.chunk_list = []
for startdate_sim in tools.iter_hours(self.startdate, self.enddate,
self.restart_step_hours):
enddate_sim = startdate_sim + timedelta(
hours=self.restart_step_hours)
if 'spinup' in self.workflow['features'] and hasattr(
self, 'spinup'):
if startdate_sim > self.startdate:
startdate_sim = startdate_sim - timedelta(
hours=self.spinup)

enddate_sim = startdate_sim + timedelta(
hours=self.restart_step_hours)
startdate_sim_yyyymmddhh = startdate_sim.strftime("%Y%m%d%H")
enddate_sim_yyyymmddhh = enddate_sim.strftime("%Y%m%d%H")
chunk_id = f"{startdate_sim_yyyymmddhh}_{enddate_sim_yyyymmddhh}"
Expand All @@ -359,7 +341,7 @@ def get_chunk_list(self):
self.chunk_list.append(chunk_id)

def get_previous_chunk_id(self, current_chunk_id):
"""Get the previous chunk ID based on the current chunk ID."""
"""Get the previous chunk ID based on the current `chunk_id`"""
index = self.chunk_list.index(current_chunk_id)
if index > 0:
self.chunk_id_prev = self.chunk_list[index - 1]
Expand All @@ -381,34 +363,30 @@ def get_dep_ids(self, job_name, add_dep=None):
dep_id_list = []

# Add job dependencies
if not self.force_sync:
# Could be that job has no dependency, even in an async config,
# e.g., prepare_data
if deps := self.workflow['dependencies'].get(job_name):
for stage in 'previous', 'current':
if dep_stage := deps.get(stage):
for job in dep_stage:
# Could be that dep job id does not exist, e.g.,
# if dep job is deactivated or it's the first chunk
if dep_id := self.job_ids[stage].get(job):
dep_id_list.extend(dep_id)
if deps := self.workflow['dependencies'].get(job_name):
for stage in 'previous', 'current':
if dep_stage := deps.get(stage):
for job in dep_stage:
# Could be that dep job id does not exist, e.g.,
# if dep job is deactivated or it's the first chunk
if dep_id := self.job_ids[stage].get(job):
dep_id_list.extend(dep_id)
return dep_id_list

def get_dep_cmd(self, job_name, add_dep=None):
"""Generate the part of the sbatch command that sepcifies dependencies for job_name."""
if not self.force_sync:
# Default: async case
if dep_ids := self.get_dep_ids(job_name, add_dep=add_dep):
dep_str = ':'.join(map(str, dep_ids))
return f'--dependency=afterok:{dep_str}'
else:
# job_name has no dependencies but still belongs to an async workflow
# so don't use --wait
return None
else:
# Needed for nested run_chain.py
"""Generate the part of the sbatch command that sepcifies dependencies for `job_name`"""
# Needed for nested run_chain.py
if self.force_sync:
return '--wait'

if dep_ids := self.get_dep_ids(job_name, add_dep=add_dep):
dep_str = ':'.join(map(str, dep_ids))
return f'--dependency=afterok:{dep_str}'

# job_name has no dependencies but still belongs to an async workflow
# so don't use --wait
return None

def submit(self, job_name, script, add_dep=None):
"""Submit job with dependencies"""
script_path = Path(script)
Expand Down Expand Up @@ -437,10 +415,11 @@ def submit(self, job_name, script, add_dep=None):

return job_id

def create_sbatch_script(self, job_name):
"""Create an sbatch script to launch jobs individually.
def submit_basic_python(self, job_name):
"""Create an sbatch script to launch basic python jobs individually.
Use run_chain.py arguments to submit those jobs.
"""
# Build job script
walltime = getattr(self, 'walltime', {}).get(job_name, "00:30:00")
script_lines = [
'#!/usr/bin/env bash',
Expand All @@ -464,7 +443,8 @@ def create_sbatch_script(self, job_name):
with open(job_file, mode='w') as job_script:
job_script.write('\n'.join(script_lines))

return job_file
# Submit job
self.submit(job_name, job_file)

def wait_for_previous(self):
"""Wait for all jobs of the previous stage to be finished.
Expand Down
1 change: 1 addition & 0 deletions env/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies:
- pillow
- xarray
- cdsapi
- scikit-learn
- sphinx
- sphinx_rtd_theme
- sphinx-copybutton
22 changes: 7 additions & 15 deletions jobs/cosmo.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,20 @@


def main(cfg):
"""Setup the namelists for a COSMO tracer run and submit the job to the queue.
Necessary for both COSMO and COSMOART simulations.
Decide if the soil model should be TERRA or TERRA multi-layer depending on
the ``startdate`` of the simulation.
"""Setup the namelists for a COSMO run and submit the job to the queue.
Create necessary directory structure to run COSMO (run, output, and
restart directories, defined in ``cfg.cosmo_run``, ``cfg.cosmo_output``,
and ``cfg.cosmo_restart_out``).
Copy the COSMO-executable from
``cfg.cosmo_bin`` to ``cfg.cosmo_run/cosmo``.
``cfg.cosmo['binary_file']`` to ``cfg.cosmo_run/cfg.cosmo['execname']``.
Convert the tracer-csv-file to a COSMO-namelist file.
Convert the tracer csv file to a COSMO namelist file.
Format the COSMO-namelist-templates
(COSMO: ``AF,ORG,IO,DYN,PHY,DIA,ASS``,
COSMOART: ``ART,ASS,DIA,DYN,EPS,INI,IO,ORG,PHY``)
using the information in ``cfg``.
Format the COSMO namelist templates using the information in ``cfg``.
Format the runscript-template and submit the job.
Format the runscript template and submit the job.
Parameters
----------
Expand Down Expand Up @@ -128,9 +120,9 @@ def main(cfg):
tools.create_dir(cfg.cosmo_restart_out, "cosmo_restart_out")

# Copy cosmo executable
cfg.cosmo['execname'] = 'cosmo.exe'
cfg.cosmo_execname = Path(cfg.cosmo['binary_file']).name
tools.copy_file(cfg.cosmo['binary_file'],
os.path.join(cfg.cosmo_run, cfg.cosmo['execname']))
cfg.cosmo_run / cfg.cosmo_execname)

# Prepare namelist and submit job
tracer_csvfile = os.path.join(cfg.chain_src_dir, 'cases', cfg.casename,
Expand Down
15 changes: 4 additions & 11 deletions jobs/icon.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,19 @@
# -*- coding: utf-8 -*-

import logging
from pathlib import Path
from . import tools, prepare_icon

BASIC_PYTHON_JOB = False


def main(cfg):
"""Setup the namelists for an ICON tracer run and submit the job to
"""Setup the namelists for an ICON run and submit the job to
the queue.
Necessary for both ICON and ICONART simulations.
Create necessary directory structure to run ICON (run, output, and
restart directories, defined in ``cfg.icon_work``, ``cfg.icon_output``,
and ``cfg.icon_restart_out``).
Copy the ICON-executable from
``cfg.icon_binary_file`` to ``cfg.icon_work/icon.exe``.
Use the tracer-csv-file to append ICON-namelist file.
Format the ICON-namelist-templates:
``icon_master.namelist.cfg, icon_NAMELIST_NWP.cfg``,
using the information in ``cfg``.
Expand All @@ -40,9 +33,9 @@ def main(cfg):
"submit the job to the queue")

# Copy icon executable
execname = 'icon.exe'
cfg.icon_execname = Path(cfg.icon['binary_file']).name
tools.create_dir(cfg.icon_work, "icon_work")
tools.copy_file(cfg.icon_binary_file, cfg.icon_work / execname)
tools.copy_file(cfg.icon_binary_file, cfg.icon_work / cfg.icon_execname)

# Symlink the restart file to the last run into the icon/run folder
if cfg.lrestart == '.TRUE.':
Expand Down
3 changes: 2 additions & 1 deletion jobs/icontools.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ def main(cfg):
merged_file = os.path.join(cfg.icon_input_icbc, merged_filename)

# Copy GEOSP file from last run if not present
if not os.path.exists(geosp_file):
if hasattr(cfg,
'icon_input_icbc_prev') and not os.path.exists(geosp_file):
geosp_src_file = os.path.join(cfg.icon_input_icbc_prev,
geosp_filename)
tools.copy_file(geosp_src_file,
Expand Down
Loading

0 comments on commit c32d633

Please sign in to comment.