diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000000..e3a4e3e47e --- /dev/null +++ b/.flake8 @@ -0,0 +1,3 @@ +[flake8] +exclude = .git,.github,venv,__pycache__,old,build,dist +max-line-length = 160 diff --git a/docs/source/configure.rst b/docs/source/configure.rst index 439c5df110..bc37bbf833 100644 --- a/docs/source/configure.rst +++ b/docs/source/configure.rst @@ -48,12 +48,15 @@ The global-workflow configs contain switches that change how the system runs. Ma | | (.true.) or cold (.false)? | | | be set when running ``setup_expt.py`` script with | | | | | | the ``--start`` flag (e.g. ``--start warm``) | +------------------+----------------------------------+---------------+-------------+---------------------------------------------------+ -| HPSSARCH | Archive to HPPS | NO | Possibly | Whether to save output to tarballs on HPPS | +| HPSSARCH | Archive to HPPS | NO | NO | Whether to save output to tarballs on HPPS. | +------------------+----------------------------------+---------------+-------------+---------------------------------------------------+ -| LOCALARCH | Archive to a local directory | NO | Possibly | Instead of archiving data to HPSS, archive to a | -| | | | | local directory, specified by ATARDIR. If | -| | | | | LOCALARCH=YES, then HPSSARCH must =NO. Changing | -| | | | | HPSSARCH from YES to NO will adjust the XML. | +| LOCALARCH | Archive to a local directory | NO | NO | Whether to save output to tarballs locally. For | +| | | | | HPSSARCH and LOCALARCH, ARCDIR specifies the | +| | | | | directory. These options are mutually exclusive. | ++------------------+----------------------------------+---------------+-------------+---------------------------------------------------+ +| ARCH_EXPDIR | Archive the EXPDIR | NO | NO | Whether to create a tarball of the EXPDIR. | +| | | | | ARCH_HASHES and ARCH_DIFFS generate text files | +| | | | | of git output that are archived with the EXPDIR. | +------------------+----------------------------------+---------------+-------------+---------------------------------------------------+ | QUILTING | Use I/O quilting | .true. | NO | If .true. choose OUTPUT_GRID as cubed_sphere_grid | | | | | | in netcdf or gaussian_grid | diff --git a/parm/archive/expdir.yaml.j2 b/parm/archive/expdir.yaml.j2 new file mode 100644 index 0000000000..e2ec3f4736 --- /dev/null +++ b/parm/archive/expdir.yaml.j2 @@ -0,0 +1,24 @@ +{% set cycle_YMDH = current_cycle | to_YMDH %} + +expdir: + name: "EXPDIR" + # Copy the experiment files from the EXPDIR into the ROTDIR for archiving + {% set copy_expdir = "expdir." ~ cycle_YMDH %} + FileHandler: + mkdir: + - "{{ ROTDIR }}/{{ copy_expdir }}" + copy: + {% for config in glob(EXPDIR ~ "/config.*") %} + - [ "{{ config }}", "{{ ROTDIR }}/{{ copy_expdir }}/." ] + {% endfor %} + - [ "{{ EXPDIR }}/{{ PSLOT }}.xml", "{{ ROTDIR }}/{{ copy_expdir }}/." ] + {% if ARCH_HASHES or ARCH_DIFFS %} + - [ "{{ EXPDIR }}/git_info.log", "{{ ROTDIR }}/{{ copy_expdir }}/." ] + {% endif %} + target: "{{ ATARDIR }}/{{ cycle_YMDH }}/expdir.tar" + required: + - "{{ copy_expdir }}/config.*" + - "{{ copy_expdir }}/{{ PSLOT }}.xml" + {% if ARCH_HASHES or ARCH_DIFFS %} + - "{{ copy_expdir }}/git_info.log" + {% endif %} diff --git a/parm/archive/master_gdas.yaml.j2 b/parm/archive/master_gdas.yaml.j2 index 11e83d387b..b3d6560012 100644 --- a/parm/archive/master_gdas.yaml.j2 +++ b/parm/archive/master_gdas.yaml.j2 @@ -40,7 +40,7 @@ datasets: # Determine if we will save restart ICs or not (only valid for cycled) {% set save_warm_start_forecast, save_warm_start_cycled = ( False, False ) %} - {% if ARCH_CYC == cycle_HH | int%} + {% if ARCH_CYC == cycle_HH | int %} # Save the forecast-only cycle ICs every ARCH_WARMICFREQ or ARCH_FCSTICFREQ days {% if (current_cycle - SDATE).days % ARCH_WARMICFREQ == 0 %} {% set save_warm_start_forecast = True %} @@ -97,3 +97,10 @@ datasets: # End of restart checking {% endif %} + +# Archive the EXPDIR if requested +{% if archive_expdir %} +{% filter indent(width=4) %} +{% include "expdir.yaml.j2" %} +{% endfilter %} +{% endif %} diff --git a/parm/archive/master_gefs.yaml.j2 b/parm/archive/master_gefs.yaml.j2 index 5dc046dcfd..e76d7c9f7a 100644 --- a/parm/archive/master_gefs.yaml.j2 +++ b/parm/archive/master_gefs.yaml.j2 @@ -10,3 +10,10 @@ datasets: {% include "gefs_extracted_ice.yaml.j2" %} {% include "gefs_extracted_wave.yaml.j2" %} {% endfilter %} + +# Archive the EXPDIR if requested +{% if archive_expdir %} +{% filter indent(width=4) %} +{% include "expdir.yaml.j2" %} +{% endfilter %} +{% endif %} diff --git a/parm/archive/master_gfs.yaml.j2 b/parm/archive/master_gfs.yaml.j2 index e7187d70d5..dc8c0640e5 100644 --- a/parm/archive/master_gfs.yaml.j2 +++ b/parm/archive/master_gfs.yaml.j2 @@ -98,3 +98,10 @@ datasets: {% endfilter %} {% endif %} {% endif %} + +# Archive the EXPDIR if requested +{% if archive_expdir %} +{% filter indent(width=4) %} +{% include "expdir.yaml.j2" %} +{% endfilter %} +{% endif %} diff --git a/parm/config/gefs/config.base b/parm/config/gefs/config.base index 65bca1b377..588678e02f 100644 --- a/parm/config/gefs/config.base +++ b/parm/config/gefs/config.base @@ -326,9 +326,13 @@ if [[ ${HPSSARCH} = "YES" ]] && [[ ${LOCALARCH} = "YES" ]]; then echo "Both HPSS and local archiving selected. Please choose one or the other." exit 3 fi -export ARCH_CYC=00 # Archive data at this cycle for warm_start capability -export ARCH_WARMICFREQ=4 # Archive frequency in days for warm_start capability +export ARCH_CYC=00 # Archive data at this cycle for warm start and/or forecast-only capabilities +export ARCH_WARMICFREQ=4 # Archive frequency in days for warm start capability export ARCH_FCSTICFREQ=1 # Archive frequency in days for gdas and gfs forecast-only capability +export ARCH_EXPDIR='YES' # Archive the EXPDIR configs, XML, and database +export ARCH_EXPDIR_FREQ=0 # How often to archive the EXPDIR in hours or 0 for first and last cycle only +export ARCH_HASHES='YES' # Archive the hashes of the GW and submodules and 'git status' for each; requires ARCH_EXPDIR +export ARCH_DIFFS='NO' # Archive the output of 'git diff' for the GW; requires ARCH_EXPDIR export DELETE_COM_IN_ARCHIVE_JOB="YES" # NO=retain ROTDIR. YES default in arch.sh and earc.sh. diff --git a/parm/config/gfs/config.base b/parm/config/gfs/config.base index 46e27b4541..cadefd0ad9 100644 --- a/parm/config/gfs/config.base +++ b/parm/config/gfs/config.base @@ -476,9 +476,13 @@ if [[ ${HPSSARCH} = "YES" ]] && [[ ${LOCALARCH} = "YES" ]]; then echo "FATAL ERROR: Both HPSS and local archiving selected. Please choose one or the other." exit 4 fi -export ARCH_CYC=00 # Archive data at this cycle for warm_start capability -export ARCH_WARMICFREQ=4 # Archive frequency in days for warm_start capability +export ARCH_CYC=00 # Archive data at this cycle for warm start and/or forecast-only capabilities +export ARCH_WARMICFREQ=4 # Archive frequency in days for warm start capability export ARCH_FCSTICFREQ=1 # Archive frequency in days for gdas and gfs forecast-only capability +export ARCH_EXPDIR='YES' # Archive the EXPDIR configs, XML, and database +export ARCH_EXPDIR_FREQ=0 # How often to archive the EXPDIR in hours or 0 for first and last cycle only +export ARCH_HASHES='YES' # Archive the hashes of the GW and submodules and 'git status' for each; requires ARCH_EXPDIR +export ARCH_DIFFS='NO' # Archive the output of 'git diff' for the GW; requires ARCH_EXPDIR # The monitor jobs are not yet supported for JEDIATMVAR. if [[ ${DO_JEDIATMVAR} = "YES" ]]; then diff --git a/scripts/exglobal_archive.py b/scripts/exglobal_archive.py index 2d3fa58313..9f9da7e22e 100755 --- a/scripts/exglobal_archive.py +++ b/scripts/exglobal_archive.py @@ -3,7 +3,7 @@ import os from pygfs.task.archive import Archive -from wxflow import AttrDict, Logger, cast_strdict_as_dtypedict, logit +from wxflow import AttrDict, Logger, cast_strdict_as_dtypedict, logit, chdir # initialize root logger logger = Logger(level=os.environ.get("LOGGING_LEVEL", "DEBUG"), colored_log=True) @@ -32,7 +32,8 @@ def main(): 'DO_AERO_ANL', 'DO_AERO_FCST', 'DOIBP_WAV', 'DO_JEDIOCNVAR', 'NMEM_ENS', 'DO_JEDIATMVAR', 'DO_VRFY_OCEANDA', 'FHMAX_FITS', 'waveGRD', 'IAUFHRS', 'DO_FIT2OBS', 'NET', 'FHOUT_HF_GFS', 'FHMAX_HF_GFS', 'REPLAY_ICS', - 'OFFSET_START_HOUR'] + 'OFFSET_START_HOUR', 'ARCH_EXPDIR', 'EXPDIR', 'ARCH_EXPDIR_FREQ', 'ARCH_HASHES', + 'ARCH_DIFFS', 'SDATE', 'EDATE', 'HOMEgfs'] archive_dict = AttrDict() for key in keys: @@ -47,21 +48,20 @@ def main(): if archive_dict[key] is None: print(f"Warning: key ({key}) not found in task_config!") - cwd = os.getcwd() + with chdir(config.ROTDIR): - os.chdir(config.ROTDIR) + # Determine which archives to create + arcdir_set, atardir_sets = archive.configure(archive_dict) - # Determine which archives to create - arcdir_set, atardir_sets = archive.configure(archive_dict) + # Populate the product archive (ARCDIR) + archive.execute_store_products(arcdir_set) - # Populate the product archive (ARCDIR) - archive.execute_store_products(arcdir_set) + # Create the backup tarballs and store in ATARDIR + for atardir_set in atardir_sets: + archive.execute_backup_dataset(atardir_set) - # Create the backup tarballs and store in ATARDIR - for atardir_set in atardir_sets: - archive.execute_backup_dataset(atardir_set) - - os.chdir(cwd) + # Clean up any temporary files + archive.clean() if __name__ == '__main__': diff --git a/sorc/wxflow b/sorc/wxflow index e1ef697430..a7b49e9cc7 160000 --- a/sorc/wxflow +++ b/sorc/wxflow @@ -1 +1 @@ -Subproject commit e1ef697430c09d2b1a0560f21f11c7a32ed5f3e2 +Subproject commit a7b49e9cc76ef4b50cc1c28d4b7959ebde99c5f5 diff --git a/ush/python/pygfs/task/archive.py b/ush/python/pygfs/task/archive.py index f1d8cdf865..c6376206b3 100644 --- a/ush/python/pygfs/task/archive.py +++ b/ush/python/pygfs/task/archive.py @@ -7,10 +7,11 @@ from logging import getLogger from typing import Any, Dict, List -from wxflow import (AttrDict, FileHandler, Hsi, Htar, Task, - chgrp, get_gid, logit, mkdir_p, parse_j2yaml, rm_p, strftime, - to_YMDH) +from wxflow import (AttrDict, FileHandler, Hsi, Htar, Task, to_timedelta, + chgrp, get_gid, logit, mkdir_p, parse_j2yaml, rm_p, rmdir, + strftime, to_YMDH, which, chdir, ProcessError) +git_filename = "git_info.log" logger = getLogger(__name__.split('.')[-1]) @@ -43,6 +44,9 @@ def __init__(self, config: Dict[str, Any]) -> None: # Extend task_config with path_dict self.task_config = AttrDict(**self.task_config, **path_dict) + # Boolean used for cleanup if the EXPDIR was archived + self.archive_expdir = False + @logit(logger) def configure(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str, Any]]): """Determine which tarballs will need to be created. @@ -109,6 +113,16 @@ def configure(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str self.tar_cmd = "" return arcdir_set, [] + # Determine if we are archiving the EXPDIR this cycle (always skip for ensembles) + if "enkf" not in arch_dict.RUN and arch_dict.ARCH_EXPDIR: + self.archive_expdir = self._archive_expdir(arch_dict) + arch_dict.archive_expdir = self.archive_expdir + + if self.archive_expdir: + # If requested, get workflow hashes/statuses/diffs for EXPDIR archiving + if arch_dict.ARCH_HASHES or arch_dict.ARCH_DIFFS: + self._pop_git_info(arch_dict) + master_yaml = "master_" + arch_dict.RUN + ".yaml.j2" parsed_sets = parse_j2yaml(os.path.join(archive_parm, master_yaml), @@ -195,6 +209,12 @@ def _create_fileset(atardir_set: Dict[str, Any]) -> List: """ fileset = [] + # Check if any external files need to be brought into the ROTDIR (i.e. EXPDIR contents) + if "FileHandler" in atardir_set: + # Run the file handler to stage files for archiving + FileHandler(atardir_set["FileHandler"]).sync() + + # Check that all required files are present and add them to the list of files to archive if "required" in atardir_set: if atardir_set.required is not None: for item in atardir_set.required: @@ -204,6 +224,7 @@ def _create_fileset(atardir_set: Dict[str, Any]) -> List: for entry in glob_set: fileset.append(entry) + # Check for optional files and add found items to the list of files to archive if "optional" in atardir_set: if atardir_set.optional is not None: for item in atardir_set.optional: @@ -244,7 +265,7 @@ def _has_rstprod(fileset: List) -> bool: return False @logit(logger) - def _protect_rstprod(self, atardir_set: Dict[str, any]) -> None: + def _protect_rstprod(self, atardir_set: Dict[str, Any]) -> None: """ Changes the group of the target tarball to rstprod and the permissions to 640. If this fails for any reason, attempt to delete the file before exiting. @@ -289,7 +310,7 @@ def _create_tarball(target: str, fileset: List) -> None: tarball.add(filename) @logit(logger) - def _gen_relative_paths(self, root_path: str) -> Dict: + def _gen_relative_paths(self, root_path: str) -> Dict[str, Any]: """Generate a dict of paths in self.task_config relative to root_path Parameters @@ -417,3 +438,147 @@ def replace_string_from_to_file(filename_in, filename_out, search_str, replace_s replace_string_from_to_file(in_track_p_file, out_track_p_file, "AVNO", pslot4) return + + @logit(logger) + def _archive_expdir(self, arch_dict: Dict[str, Any]) -> bool: + """ + This function checks if the EXPDIR should be archived this RUN/cycle + and returns the temporary path in the ROTDIR where the EXPDIR will be + copied to for archiving. + + Parameters + ---------- + arch_dict: Dict + Dictionary with required parameters, including the following: + + current_cycle: Datetime + Date of the current cycle. + SDATE: Datetime + Starting cycle date. + EDATE: Datetime + Ending cycle date. + NET: str + The workflow type (gfs or gefs) + ARCH_EXPDIR_FREQ: int + Frequency to perform EXPDIR archiving + ROTDIR: str + Full path to the ROTDIR + """ + + # Get commonly used variables + current_cycle = arch_dict.current_cycle + sdate = arch_dict.SDATE + edate = arch_dict.EDATE + mode = arch_dict.MODE + assim_freq = to_timedelta(f"+{arch_dict.assim_freq}H") + # Convert frequency to seconds from hours + freq = arch_dict.ARCH_EXPDIR_FREQ * 3600 + + # Skip gfs and enkf cycled RUNs (only archive during gdas RUNs) + # (do not skip forecast-only, regardless of RUN) + if arch_dict.NET == "gfs" and arch_dict.MODE == "cycled" and arch_dict.RUN != "gdas": + return False + + # Determine if we should skip this cycle + # If the frequency is set to 0, only run on sdate (+assim_freq for cycled) and edate + first_full = sdate + if mode in ["cycled"]: + first_full += assim_freq + if current_cycle in [first_full, edate]: + # Always save the first and last + return True + elif (current_cycle - first_full).total_seconds() % freq == 0: + # Otherwise, the frequency is in hours + return True + else: + return False + + @logit(logger) + def _pop_git_info(self, arch_dict: Dict[str, Any]) -> Dict[str, Any]: + """ + This function checks the configuration options ARCH_HASHES and ARCH_DIFFS + and ARCH_EXPDIR_FREQ to determine if the git hashes and/or diffs should be + added to the EXPDIR for archiving and execute the commands. The hashes and + diffs will be stored in EXPDIR/git_info.log. + + Parameters + ---------- + arch_dict: Dict + Dictionary with required parameters, including the following: + + EXPDIR: str + Location of the EXPDIR + HOMEgfs: str + Location of the HOMEgfs (the global workflow) + ARCH_HASHES: bool + Whether to archive git hashes of the workflow and submodules + ARCH_DIFFS: bool + Whether to archive git diffs of the workflow and submodules + """ + + # Get commonly used variables + arch_hashes = arch_dict.ARCH_HASHES + arch_diffs = arch_dict.ARCH_DIFFS + homegfs = arch_dict.HOMEgfs + expdir = arch_dict.EXPDIR + + # Find the git command + git = which('git') + if git is None: + raise FileNotFoundError("FATAL ERROR: the git command could not be found!") + + output = "" + # Navigate to HOMEgfs to run the git commands + with chdir(homegfs): + + # Are we running git to get hashes? + if arch_hashes: + output += "Global workflow hash:\n" + + try: + output += git("rev-parse", "HEAD", output=str) + output += "\nSubmodule hashes:\n" + output += git("submodule", "status", output=str) + except ProcessError as pe: + raise OSError("FATAL ERROR Failed to run git") from pe + + # Are we running git to get diffs? + if arch_diffs: + output += "Global workflow diffs:\n" + # This command will only work on git v2.14+ + try: + output += git("diff", "--submodule=diff", output=str) + except ProcessError: + # The version of git may be too old. See if we can run just a surface diff. + try: + output += git("diff", output=str) + print("WARNING git was unable to do a recursive diff.\n" + "Only a top level diff was performed.\n" + "Note that the git version must be >= 2.14 for this feature.") + except ProcessError as pe: + raise OSError("FATAL ERROR Failed to run 'git diff'") from pe + + # Write out to the log file + try: + with open(os.path.join(expdir, git_filename), 'w') as output_file: + output_file.write(output) + except OSError as ose: + fname = os.path.join(expdir, git_filename) + raise OSError(f"FATAL ERROR Unable to write git output to '{fname}'") from ose + + return + + @logit(logger) + def clean(self): + """ + Remove the temporary directories/files created by the Archive task. + Presently, this is only the ROTDIR/expdir directory if EXPDIR archiving + was performed. + """ + + if self.archive_expdir: + temp_expdir_path = os.path.join(self.task_config.ROTDIR, "expdir." + + to_YMDH(self.task_config.current_cycle)) + rmdir(temp_expdir_path) + + return