Skip to content

Commit

Permalink
Fix post_cosmo and add functions to set config variables
Browse files Browse the repository at this point in the history
  • Loading branch information
mjaehn committed Sep 30, 2023
1 parent 005544d commit cb8a884
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 77 deletions.
2 changes: 1 addition & 1 deletion cases/cosmo-ghg-11km-test/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ reduce_output:
output_levels: 20

post_cosmo:
output_root: null # Set this value based on your use case
output_root: ./output/cosmo-ghg-spinup

verify_chain:
reference_dir: null # Set this value based on your use case
Expand Down
61 changes: 53 additions & 8 deletions jobs/cosmo.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,55 @@
import logging
import os
import subprocess
import csv
from .tools import write_cosmo_input_ghg
from . import tools
from datetime import datetime, timedelta


def set_cfg_variables(cfg, model_cfg):

setattr(cfg, 'cosmo_base', os.path.join(cfg.chain_root, 'cosmo'))
setattr(cfg, 'cosmo_input', os.path.join(cfg.chain_root, 'cosmo', 'input'))
setattr(cfg, 'cosmo_run', os.path.join(cfg.chain_root, 'cosmo', 'run'))
setattr(cfg, 'cosmo_output', os.path.join(cfg.chain_root, 'cosmo',
'output'))
setattr(cfg, 'cosmo_output_reduced',
os.path.join(cfg.chain_root, 'cosmo', 'output_reduced'))

# Number of tracers
if 'tracers' in model_cfg['models'][cfg.model]['features']:
tracer_csvfile = os.path.join(cfg.chain_src_dir, 'cases',
cfg.casename, 'cosmo_tracers.csv')
if os.path.isfile(tracer_csvfile):
with open(tracer_csvfile, 'r') as csv_file:
reader = csv.DictReader(csv_file, delimiter=',')
reader = [r for r in reader if r[''] != '#']
setattr(cfg, 'in_tracers', len(reader))
else:
raise FileNotFoundError(f"File not found: {tracer_csvfile}")

# tracer_start namelist paramter for spinup simulation
if hasattr(cfg, 'spinup'):
if cfg.first_one:
setattr(cfg, 'tracer_start', 0)
else:
setattr(cfg, 'tracer_start', cfg.spinup)
else:
setattr(cfg, 'tracer_start', 0)

# asynchronous I/O
if hasattr(cfg, 'cfg.cosmo_np_io'):
if cfg.cosmo_np_io == 0:
setattr(cfg, 'lasync_io', '.FALSE.')
setattr(cfg, 'num_iope_percomm', 0)
else:
setattr(cfg, 'lasync_io', '.TRUE.')
setattr(cfg, 'num_iope_percomm', 1)

return cfg


def main(starttime, hstart, hstop, cfg, model_cfg):
"""Setup the namelists for a **COSMO** tracer run and submit the job to
the queue
Expand All @@ -29,11 +73,11 @@ def main(starttime, hstart, hstop, cfg, model_cfg):
``startdate`` of the simulation.
Create necessary directory structure to run **COSMO** (run, output and
restart directories, defined in ``cfg.cosmo_work``, ``cfg.cosmo_output``
restart directories, defined in ``cfg.cosmo_run``, ``cfg.cosmo_output``
and ``cfg.cosmo_restart_out``).
Copy the **COSMO**-executable from
``cfg.cosmo_bin`` to ``cfg.cosmo_work/cosmo``.
``cfg.cosmo_bin`` to ``cfg.cosmo_run/cosmo``.
Convert the tracer-csv-file to a **COSMO**-namelist file.
Expand All @@ -56,14 +100,15 @@ def main(starttime, hstart, hstop, cfg, model_cfg):
cfg : config-object
Object holding all user-configuration parameters as attributes
"""
cfg = set_cosmo_variables(cfg, model_cfg)
logfile = os.path.join(cfg.log_working_dir, "cosmo")
logfile_finish = os.path.join(cfg.log_finished_dir, "cosmo")

logging.info("Setup the namelist for a COSMO tracer run and "
"submit the job to the queue")

# Create directories
tools.create_dir(cfg.cosmo_work, "cosmo_work")
tools.create_dir(cfg.cosmo_run, "cosmo_run")
tools.create_dir(cfg.cosmo_output, "cosmo_output")

# Total number of processes
Expand Down Expand Up @@ -148,7 +193,7 @@ def main(starttime, hstart, hstop, cfg, model_cfg):
# Copy cosmo executable
cfg.cosmo['execname'] = cfg.model.lower()
tools.copy_file(cfg.cosmo['binary_file'],
os.path.join(cfg.cosmo_work, cfg.cosmo['execname']))
os.path.join(cfg.cosmo_run, cfg.cosmo['execname']))

# Prepare namelist and submit job
tracer_csvfile = os.path.join(cfg.chain_src_dir, 'cases', cfg.casename,
Expand All @@ -173,7 +218,7 @@ def main(starttime, hstart, hstop, cfg, model_cfg):
with open(namelist_file) as input_file:
cosmo_namelist = input_file.read()

output_file = os.path.join(cfg.cosmo_work, "INPUT_" + section)
output_file = os.path.join(cfg.cosmo_run, "INPUT_" + section)
with open(output_file, "w") as outf:
if hasattr(cfg, 'spinup'):
# no built-in restarts
Expand All @@ -199,7 +244,7 @@ def main(starttime, hstart, hstop, cfg, model_cfg):
# Append INPUT_GHG namelist with tracer definitions from csv file
if os.path.isfile(tracer_csvfile):
if cfg.model == 'cosmo-ghg':
input_ghg_filename = os.path.join(cfg.cosmo_work, 'INPUT_GHG')
input_ghg_filename = os.path.join(cfg.cosmo_run, 'INPUT_GHG')

write_cosmo_input_ghg.main(tracer_csvfile, input_ghg_filename, cfg)

Expand All @@ -209,7 +254,7 @@ def main(starttime, hstart, hstop, cfg, model_cfg):
with open(runscript_file) as input_file:
cosmo_runscript = input_file.read()

output_file = os.path.join(cfg.cosmo_work, "run.job")
output_file = os.path.join(cfg.cosmo_run, "run.job")
with open(output_file, "w") as outf:
outf.write(
cosmo_runscript.format(cfg=cfg,
Expand All @@ -220,7 +265,7 @@ def main(starttime, hstart, hstop, cfg, model_cfg):

result = subprocess.run(
["sbatch", "--wait",
os.path.join(cfg.cosmo_work, 'run.job')])
os.path.join(cfg.cosmo_run, 'run.job')])
exitcode = result.returncode
if exitcode != 0:
raise RuntimeError("sbatch returned exitcode {}".format(exitcode))
20 changes: 13 additions & 7 deletions jobs/int2lm.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@
import pytz


def set_cfg_variables(cfg):

# Int2lm processing always starts at hstart=0, thus modifying inidate
inidate_int2lm_yyyymmddhh = (
cfg.startdate + timedelta(hours=cfg.hstart)).strftime('%Y%m%d%H')
setattr(cfg, 'inidate_int2lm_yyyymmddhh', inidate_int2lm_yyyymmddhh)
setattr(cfg, 'int2lm_run', os.path.join(cfg.chain_root, 'int2lm', 'run'))
setattr(cfg, 'int2lm_output',
os.path.join(cfg.chain_root, 'int2lm', 'output'))

return cfg

def main(starttime, hstart, hstop, cfg, model_cfg):
"""Setup the namelist for **int2lm** and submit the job to the queue.
Expand Down Expand Up @@ -55,20 +67,14 @@ def main(starttime, hstart, hstop, cfg, model_cfg):
cfg : config-object
Object holding all user-configuration parameters as attributes
"""
# Int2lm processing always starts at hstart=0, thus modifying inidate
inidate_int2lm_yyyymmddhh = (
cfg.startdate + timedelta(hours=cfg.hstart)).strftime('%Y%m%d%H')
setattr(cfg, 'inidate_int2lm_yyyymmddhh', inidate_int2lm_yyyymmddhh)
cfg = set_int2lm_variables(cfg)
hstart_int2lm = 0
hstop_int2lm = cfg.forecasttime

# Total number of processes
np_tot = cfg.int2lm['np_x'] * cfg.int2lm['np_y']

# Set folder names
setattr(cfg, 'int2lm_run', os.path.join(cfg.chain_root, 'int2lm', 'run'))
setattr(cfg, 'int2lm_output',
os.path.join(cfg.chain_root, 'int2lm', 'output'))
int2lm_run = os.path.join(cfg.int2lm_run)
int2lm_output = os.path.join(cfg.int2lm_output)

Expand Down
43 changes: 23 additions & 20 deletions jobs/post_cosmo.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import glob
from subprocess import call

from . import tools
from . import tools, int2lm, cosmo


def logfile_header_template():
Expand All @@ -29,7 +29,7 @@ def runscript_header_template():
"#SBATCH --job-name=post_cosmo", "#SBATCH --partition=xfer",
"#SBATCH --constraint={constraint}",
"#SBATCH --account={compute_account}", "#SBATCH --output={logfile}",
"#SBATCH --open-mode=append", "#SBATCH --chdir={cosmo_work}",
"#SBATCH --open-mode=append", "#SBATCH --chdir={cosmo_run}",
"#SBATCH --time=00:30:00", "", ""
])

Expand All @@ -44,8 +44,8 @@ def runscript_commands_template():
commands = list()

return '\n'.join([
"srun cp -Raf {int2lm_work_src}/. {int2lm_work_dest}/",
"srun cp -Raf {cosmo_work_src}/. {cosmo_work_dest}/",
"srun cp -Raf {int2lm_run_src}/. {int2lm_run_dest}/",
"srun cp -Raf {cosmo_run_src}/. {cosmo_run_dest}/",
"srun cp -Raf {cosmo_output_src}/. {cosmo_output_dest}/",
"srun cp -Raf {logs_src}/. {logs_dest}/"
])
Expand All @@ -55,8 +55,8 @@ def main(starttime, hstart, hstop, cfg, model_cfg):
"""Copy the output of a **COSMO**-run to a user-defined position.
Write a runscript to copy all files (**COSMO** settings & output,
**int2lm** settings, logfiles) from ``cfg.cosmo_work``,
``cfg.cosmo_output``, ``cfg.int2lm_work``, ``cfg.log_finished_dir`` to
**int2lm** settings, logfiles) from ``cfg.cosmo_run``,
``cfg.cosmo_output``, ``cfg.int2lm_run``, ``cfg.log_finished_dir`` to
``cfg.output_root/...`` .
If the job ``reduce_output`` has been run before ``post_cosmo``, a
directory ``cfg.cosmo_output_reduced`` is created. In this case,
Expand All @@ -75,16 +75,14 @@ def main(starttime, hstart, hstop, cfg, model_cfg):
cfg : config-object
Object holding all user-configuration parameters as attributes
"""
if cfg.compute_host != "daint":
logging.error("The copy script is supposed to be run on daint only,"
"not on {}".format(cfg.compute_host))
raise RuntimeError("Wrong compute host for copy-script")
cfg = int2lm.set_cfg_variables(cfg)
cfg = cosmo.set_cfg_variables(cfg, model_cfg)

logfile = os.path.join(cfg.log_working_dir, "post_cosmo")
cosmo_work_dir = cfg.cosmo_work
runscript_path = os.path.join(cfg.cosmo_work, "post_cosmo.job")
cosmo_run_dir = cfg.cosmo_run
runscript_path = os.path.join(cfg.cosmo_run, "post_cosmo.job")
copy_path = os.path.join(
cfg.output_root,
cfg.post_cosmo['output_root'],
starttime.strftime('%Y%m%d%H') + "_" + str(int(hstart)) + "_" +
str(int(hstop)))

Expand All @@ -97,7 +95,7 @@ def main(starttime, hstart, hstop, cfg, model_cfg):
compute_account=cfg.compute_account,
logfile=logfile,
constraint=cfg.constraint,
cosmo_work=cfg.cosmo_work)
cosmo_run=cfg.cosmo_run)

if os.path.isdir(cfg.cosmo_output_reduced):
cosmo_output_src = cfg.cosmo_output_reduced.rstrip('/')
Expand All @@ -113,17 +111,22 @@ def main(starttime, hstart, hstop, cfg, model_cfg):
os.makedirs(cosmo_output_dest, exist_ok=True)
os.makedirs(os.path.join(copy_path, "logs"), exist_ok=True)

int2lm_run_path = os.path.abspath(os.path.join(copy_path, "int2lm_run"))
cosmo_run_path = os.path.abspath(os.path.join(copy_path, "cosmo_run"))
cosmo_output_dest_path = os.path.abspath(cosmo_output_dest)
logs_path = os.path.abspath(os.path.join(copy_path, "logs"))

# Format the runscript
runscript_content += runscript_commands_template().format(
target_dir=copy_path.rstrip('/'),
int2lm_work_src=cfg.int2lm_work.rstrip('/'),
int2lm_work_dest=os.path.join(copy_path, "int2lm_run").rstrip('/'),
cosmo_work_src=cfg.cosmo_work.rstrip('/'),
cosmo_work_dest=os.path.join(copy_path, "cosmo_run").rstrip('/'),
int2lm_run_src=cfg.int2lm_run.rstrip('/'),
int2lm_run_dest=int2lm_run_path.rstrip('/'),
cosmo_run_src=cfg.cosmo_run.rstrip('/'),
cosmo_run_dest=cosmo_run_path.rstrip('/'),
cosmo_output_src=cosmo_output_src,
cosmo_output_dest=cosmo_output_dest,
cosmo_output_dest=cosmo_output_dest_path,
logs_src=cfg.log_finished_dir.rstrip('/'),
logs_dest=os.path.join(copy_path, "logs").rstrip('/'))
logs_dest=logs_path.rstrip('/'))

# Wait for Cosmo to finish first
tools.check_job_completion(cfg.log_finished_dir, "cosmo")
Expand Down
41 changes: 0 additions & 41 deletions run_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,47 +282,6 @@ def run_chain(work_root, model_cfg, cfg, start_time, hstart, hstop, job_names,
setattr(cfg, 'job_id', job_id)
setattr(cfg, 'chain_root', chain_root)

if cfg.model.startswith('cosmo'):
# COSMO
setattr(cfg, 'cosmo_base', os.path.join(chain_root, 'cosmo'))
setattr(cfg, 'cosmo_input', os.path.join(chain_root, 'cosmo', 'input'))
setattr(cfg, 'cosmo_work', os.path.join(chain_root, 'cosmo', 'run'))
setattr(cfg, 'cosmo_output', os.path.join(chain_root, 'cosmo',
'output'))
setattr(cfg, 'cosmo_output_reduced',
os.path.join(chain_root, 'cosmo', 'output_reduced'))

# Number of tracers
if 'tracers' in model_cfg['models'][cfg.model]['features']:
tracer_csvfile = os.path.join(cfg.chain_src_dir, 'cases',
cfg.casename, 'cosmo_tracers.csv')
if os.path.isfile(tracer_csvfile):
with open(tracer_csvfile, 'r') as csv_file:
reader = csv.DictReader(csv_file, delimiter=',')
reader = [r for r in reader if r[''] != '#']
setattr(cfg, 'in_tracers', len(reader))
else:
raise FileNotFoundError(f"File not found: {tracer_csvfile}")

# tracer_start namelist paramter for spinup simulation
if hasattr(cfg, 'spinup'):
if cfg.first_one:
setattr(cfg, 'tracer_start', 0)
else:
setattr(cfg, 'tracer_start', cfg.spinup)
else:
setattr(cfg, 'tracer_start', 0)

# asynchronous I/O
if hasattr(cfg, 'cfg.cosmo_np_io'):
if cfg.cosmo_np_io == 0:
setattr(cfg, 'lasync_io', '.FALSE.')
setattr(cfg, 'num_iope_percomm', 0)
else:
setattr(cfg, 'lasync_io', '.TRUE.')
setattr(cfg, 'num_iope_percomm', 1)

# constraint (gpu or mc)
if hasattr(cfg, 'constraint'):
assert cfg.constraint in ['gpu', 'mc'], ("Unknown constraint, use"
"gpu or mc")
Expand Down

0 comments on commit cb8a884

Please sign in to comment.