Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Archive the experiment directory along with git status/diff output #3105

Open
wants to merge 20 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
e7e43d2
First crack at archiving the expdir and git status
DavidHuber-NOAA Nov 15, 2024
9f1a6fc
Merge remote-tracking branch 'origin/develop' into feature/archive_ex…
DavidHuber-NOAA Nov 15, 2024
2746e46
Add flake8 rules for the global workflow
DavidHuber-NOAA Nov 18, 2024
1579844
Copy expdir to ROTDIR before tar'ing
DavidHuber-NOAA Nov 18, 2024
95f04d4
Merge branch 'develop' into feature/archive_expdir
DavidHuber-NOAA Nov 18, 2024
23b0454
Allow users to specify their HPC account
DavidHuber-NOAA Nov 19, 2024
f88c077
Start archiving EXPDIR on the first full cycle and name expdir direct…
DavidHuber-NOAA Nov 19, 2024
6924428
Merge branch 'feature/archive_expdir' of github.com:davidhuber-noaa/g…
DavidHuber-NOAA Nov 19, 2024
d2db267
Merge remote-tracking branch 'origin/develop' into feature/archive_ex…
DavidHuber-NOAA Nov 19, 2024
7d0b70a
Trimmed .flake8 exclusions
DavidHuber-NOAA Nov 19, 2024
5f67730
Merge remote-tracking branch 'origin/develop' into feature/archive_ex…
DavidHuber-NOAA Nov 20, 2024
a61055e
Merge branch 'feature/archive_expdir' of github.com:davidhuber-noaa/g…
DavidHuber-NOAA Nov 20, 2024
f385f0c
Merge branch 'develop' into feature/archive_expdir
DavidHuber-NOAA Nov 25, 2024
1b4340e
Merge remote-tracking branch 'emc/develop' into feature/archive_expdir
DavidHuber-NOAA Nov 26, 2024
7ca219c
Merge branch 'NOAA-EMC:develop' into feature/archive_expdir
DavidHuber-NOAA Dec 2, 2024
12b6f95
Remove extra A option
DavidHuber-NOAA Dec 2, 2024
b8cc2ef
Add documentation on EXPDIR archiving
DavidHuber-NOAA Dec 2, 2024
5014922
Merge remote-tracking branch 'origin/develop' into feature/archive_ex…
DavidHuber-NOAA Dec 3, 2024
7ab8dc1
Apply suggestions from code review
DavidHuber-NOAA Dec 3, 2024
0dd3eb8
Apply suggestions from code review
DavidHuber-NOAA Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[flake8]
exclude = .git,.github,venv,__pycache__,old,build,dist
max-line-length = 160
26 changes: 26 additions & 0 deletions parm/archive/expdir.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{% set cycle_YMDH = current_cycle | to_YMDH %}

expdir:
name: "EXPDIR"
# Copy the experiment files from the EXPDIR into the ROTDIR for archiving
{% set copy_expdir = "expdir." ~ cycle_YMDH %}
FileHandler:
mkdir:
- "{{ ROTDIR }}/{{ copy_expdir }}"
copy:
{% for config in glob(EXPDIR ~ "/config.*") %}
- [ "{{ config }}", "{{ ROTDIR }}/{{ copy_expdir }}/." ]
{% endfor %}
- [ "{{ EXPDIR }}/{{ PSLOT }}.db", "{{ ROTDIR }}/{{ copy_expdir }}/." ]
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved
- [ "{{ EXPDIR }}/{{ PSLOT }}.xml", "{{ ROTDIR }}/{{ copy_expdir }}/." ]
{% if ARCH_HASHES or ARCH_DIFFS %}
- [ "{{ EXPDIR }}/git_info.log", "{{ ROTDIR }}/{{ copy_expdir }}/." ]
{% endif %}
target: "{{ ATARDIR }}/{{ cycle_YMDH }}/expdir.tar"
required:
- "{{ copy_expdir }}/config.*"
- "{{ copy_expdir }}/{{ PSLOT }}.db"
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved
- "{{ copy_expdir }}/{{ PSLOT }}.xml"
{% if ARCH_HASHES or ARCH_DIFFS %}
- "{{ copy_expdir }}/git_info.log"
{% endif %}
9 changes: 8 additions & 1 deletion parm/archive/master_gdas.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ datasets:
# Determine if we will save restart ICs or not (only valid for cycled)
{% set save_warm_start_forecast, save_warm_start_cycled = ( False, False ) %}

{% if ARCH_CYC == cycle_HH | int%}
{% if ARCH_CYC == cycle_HH | int %}
# Save the forecast-only cycle ICs every ARCH_WARMICFREQ or ARCH_FCSTICFREQ days
{% if (current_cycle - SDATE).days % ARCH_WARMICFREQ == 0 %}
{% set save_warm_start_forecast = True %}
Expand Down Expand Up @@ -97,3 +97,10 @@ datasets:

# End of restart checking
{% endif %}

# Archive the EXPDIR if requested
{% if archive_expdir %}
{% filter indent(width=4) %}
{% include "expdir.yaml.j2" %}
{% endfilter %}
{% endif %}
Comment on lines +101 to +106
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should only need this in one of gdas or gfs, but there is a bit of a coordination problem since either can be run without the other.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I added a method to archive.py to determine which RUN to archive this in and set the archive_expdir boolean accordingly.

7 changes: 7 additions & 0 deletions parm/archive/master_gefs.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,10 @@ datasets:
{% include "gefs_extracted_ice.yaml.j2" %}
{% include "gefs_extracted_wave.yaml.j2" %}
{% endfilter %}

# Archive the EXPDIR if requested
{% if archive_expdir %}
{% filter indent(width=4) %}
{% include "expdir.yaml.j2" %}
{% endfilter %}
{% endif %}
7 changes: 7 additions & 0 deletions parm/archive/master_gfs.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,10 @@ datasets:
{% endfilter %}
{% endif %}
{% endif %}

# Archive the EXPDIR if requested
{% if archive_expdir %}
{% filter indent(width=4) %}
{% include "expdir.yaml.j2" %}
{% endfilter %}
{% endif %}
8 changes: 6 additions & 2 deletions parm/config/gefs/config.base
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,13 @@ if [[ ${HPSSARCH} = "YES" ]] && [[ ${LOCALARCH} = "YES" ]]; then
echo "Both HPSS and local archiving selected. Please choose one or the other."
exit 3
fi
export ARCH_CYC=00 # Archive data at this cycle for warm_start capability
export ARCH_WARMICFREQ=4 # Archive frequency in days for warm_start capability
export ARCH_CYC=00 # Archive data at this cycle for warm start and/or forecast-only capabilities
export ARCH_WARMICFREQ=4 # Archive frequency in days for warm start capability
export ARCH_FCSTICFREQ=1 # Archive frequency in days for gdas and gfs forecast-only capability
export ARCH_EXPDIR='YES' # Archive the EXPDIR configs, XML, and database
export ARCH_EXPDIR_FREQ=0 # How often to archive the EXPDIR in hours or 0 for first and last cycle only
export ARCH_HASHES='YES' # Archive the hashes of the GW and submodules and 'git status' for each; requires ARCH_EXPDIR
export ARCH_DIFFS='NO' # Archive the output of 'git diff' for the GW; requires ARCH_EXPDIR

export DELETE_COM_IN_ARCHIVE_JOB="YES" # NO=retain ROTDIR. YES default in arch.sh and earc.sh.

Expand Down
8 changes: 6 additions & 2 deletions parm/config/gfs/config.base
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,13 @@ if [[ ${HPSSARCH} = "YES" ]] && [[ ${LOCALARCH} = "YES" ]]; then
echo "FATAL ERROR: Both HPSS and local archiving selected. Please choose one or the other."
exit 4
fi
export ARCH_CYC=00 # Archive data at this cycle for warm_start capability
export ARCH_WARMICFREQ=4 # Archive frequency in days for warm_start capability
export ARCH_CYC=00 # Archive data at this cycle for warm start and/or forecast-only capabilities
export ARCH_WARMICFREQ=4 # Archive frequency in days for warm start capability
export ARCH_FCSTICFREQ=1 # Archive frequency in days for gdas and gfs forecast-only capability
export ARCH_EXPDIR='YES' # Archive the EXPDIR configs, XML, and database
export ARCH_EXPDIR_FREQ=0 # How often to archive the EXPDIR in hours or 0 for first and last cycle only
export ARCH_HASHES='YES' # Archive the hashes of the GW and submodules and 'git status' for each; requires ARCH_EXPDIR
export ARCH_DIFFS='NO' # Archive the output of 'git diff' for the GW; requires ARCH_EXPDIR

# The monitor jobs are not yet supported for JEDIATMVAR.
if [[ ${DO_JEDIATMVAR} = "YES" ]]; then
Expand Down
26 changes: 13 additions & 13 deletions scripts/exglobal_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os

from pygfs.task.archive import Archive
from wxflow import AttrDict, Logger, cast_strdict_as_dtypedict, logit
from wxflow import AttrDict, Logger, cast_strdict_as_dtypedict, logit, chdir

# initialize root logger
logger = Logger(level=os.environ.get("LOGGING_LEVEL", "DEBUG"), colored_log=True)
Expand Down Expand Up @@ -32,7 +32,8 @@ def main():
'DO_AERO_ANL', 'DO_AERO_FCST', 'DOIBP_WAV', 'DO_JEDIOCNVAR',
'NMEM_ENS', 'DO_JEDIATMVAR', 'DO_VRFY_OCEANDA', 'FHMAX_FITS', 'waveGRD',
'IAUFHRS', 'DO_FIT2OBS', 'NET', 'FHOUT_HF_GFS', 'FHMAX_HF_GFS', 'REPLAY_ICS',
'OFFSET_START_HOUR']
'OFFSET_START_HOUR', 'ARCH_EXPDIR', 'EXPDIR', 'ARCH_EXPDIR_FREQ', 'ARCH_HASHES',
'ARCH_DIFFS', 'SDATE', 'EDATE', 'HOMEgfs']

archive_dict = AttrDict()
for key in keys:
Expand All @@ -47,21 +48,20 @@ def main():
if archive_dict[key] is None:
print(f"Warning: key ({key}) not found in task_config!")

cwd = os.getcwd()
with chdir(config.ROTDIR):

os.chdir(config.ROTDIR)
# Determine which archives to create
arcdir_set, atardir_sets = archive.configure(archive_dict)

# Determine which archives to create
arcdir_set, atardir_sets = archive.configure(archive_dict)
# Populate the product archive (ARCDIR)
archive.execute_store_products(arcdir_set)

# Populate the product archive (ARCDIR)
archive.execute_store_products(arcdir_set)
# Create the backup tarballs and store in ATARDIR
for atardir_set in atardir_sets:
archive.execute_backup_dataset(atardir_set)

# Create the backup tarballs and store in ATARDIR
for atardir_set in atardir_sets:
archive.execute_backup_dataset(atardir_set)

os.chdir(cwd)
# Clean up any temporary files
archive.clean()


if __name__ == '__main__':
Expand Down
179 changes: 174 additions & 5 deletions ush/python/pygfs/task/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
from logging import getLogger
from typing import Any, Dict, List

from wxflow import (AttrDict, FileHandler, Hsi, Htar, Task,
chgrp, get_gid, logit, mkdir_p, parse_j2yaml, rm_p, strftime,
to_YMDH)
from wxflow import (AttrDict, FileHandler, Hsi, Htar, Task, to_timedelta,
chgrp, get_gid, logit, mkdir_p, parse_j2yaml, rm_p, rmdir,
strftime, to_YMDH, which, chdir, ProcessError)

git_filename = "git_info.log"
logger = getLogger(__name__.split('.')[-1])


Expand Down Expand Up @@ -43,6 +44,9 @@ def __init__(self, config: Dict[str, Any]) -> None:
# Extend task_config with path_dict
self.task_config = AttrDict(**self.task_config, **path_dict)

# Boolean used for cleanup if the EXPDIR was archived
self.archive_expdir = False

@logit(logger)
def configure(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str, Any]]):
"""Determine which tarballs will need to be created.
Expand Down Expand Up @@ -109,6 +113,16 @@ def configure(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str
self.tar_cmd = ""
return arcdir_set, []

# Determine if we are archiving the EXPDIR this cycle
if arch_dict.ARCH_EXPDIR:
self.archive_expdir = Archive._archive_expdir(arch_dict)
arch_dict.archive_expdir = self.archive_expdir

if self.archive_expdir:
# If requested, get workflow hashes/statuses/diffs for EXPDIR archiving
if arch_dict.ARCH_HASHES or arch_dict.ARCH_DIFFS:
Archive._pop_git_info(arch_dict)
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved

master_yaml = "master_" + arch_dict.RUN + ".yaml.j2"

parsed_sets = parse_j2yaml(os.path.join(archive_parm, master_yaml),
Expand Down Expand Up @@ -195,6 +209,12 @@ def _create_fileset(atardir_set: Dict[str, Any]) -> List:
"""

fileset = []
# Check if any external files need to be brought into the ROTDIR (i.e. EXPDIR contents)
if "FileHandler" in atardir_set:
# Run the file handler to stage files for archiving
FileHandler(atardir_set["FileHandler"]).sync()

# Check that all required files are present and add them to the list of files to archive
if "required" in atardir_set:
if atardir_set.required is not None:
for item in atardir_set.required:
Expand All @@ -204,6 +224,7 @@ def _create_fileset(atardir_set: Dict[str, Any]) -> List:
for entry in glob_set:
fileset.append(entry)

# Check for optional files and add found items to the list of files to archive
if "optional" in atardir_set:
if atardir_set.optional is not None:
for item in atardir_set.optional:
Expand Down Expand Up @@ -244,7 +265,7 @@ def _has_rstprod(fileset: List) -> bool:
return False

@logit(logger)
def _protect_rstprod(self, atardir_set: Dict[str, any]) -> None:
def _protect_rstprod(self, atardir_set: Dict[str, Any]) -> None:
"""
Changes the group of the target tarball to rstprod and the permissions to
640. If this fails for any reason, attempt to delete the file before exiting.
Expand Down Expand Up @@ -289,7 +310,7 @@ def _create_tarball(target: str, fileset: List) -> None:
tarball.add(filename)

@logit(logger)
def _gen_relative_paths(self, root_path: str) -> Dict:
def _gen_relative_paths(self, root_path: str) -> Dict[str, Any]:
"""Generate a dict of paths in self.task_config relative to root_path

Parameters
Expand Down Expand Up @@ -417,3 +438,151 @@ def replace_string_from_to_file(filename_in, filename_out, search_str, replace_s
replace_string_from_to_file(in_track_p_file, out_track_p_file, "AVNO", pslot4)

return

@staticmethod
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved
@logit(logger)
def _archive_expdir(arch_dict: Dict[str, Any]) -> bool:
"""
This function checks if the EXPDIR should be archived this RUN/cycle
and returns the temporary path in the ROTDIR where the EXPDIR will be
copied to for archiving.

Parameters
----------
arch_dict: Dict
Dictionary with required parameters, including the following:

current_cycle: Datetime
Date of the current cycle.
SDATE: Datetime
Starting cycle date.
EDATE: Datetime
Ending cycle date.
NET: str
The workflow type (gfs or gefs)
ARCH_EXPDIR_FREQ: int
Frequency to perform EXPDIR archiving
ROTDIR: str
Full path to the ROTDIR
"""

# Get commonly used variables
current_cycle = arch_dict.current_cycle
sdate = arch_dict.SDATE
edate = arch_dict.EDATE
mode = arch_dict.MODE
assim_freq = to_timedelta(f"+{arch_dict.assim_freq}H")
# Convert frequency to seconds from hours
freq = arch_dict.ARCH_EXPDIR_FREQ * 3600

# Skip gfs and enkf cycled RUNs (only archive during gdas RUNs)
# (do not skip forecast-only, regardless of RUN)
if arch_dict.NET == "gfs" and arch_dict.MODE == "cycled" and arch_dict.RUN != "gdas":
return False

# Determine if we should skip this cycle
# If the frequency is set to 0, only run on sdate (+assim_freq for cycled) and edate
if freq == 0:
if mode == "forecast-only" and (current_cycle != sdate or current_cycle != edate):
return False
elif mode == "cycled" and (current_cycle != sdate + assim_freq or current_cycle != edate):
return False
# Otherwise, the frequency is in hours
elif mode == "forecast-only" and (sdate - current_cycle).total_seconds() % freq != 0:
return False
elif mode == "cycled" and (sdate + assim_freq - current_cycle).total_seconds() % freq != 0:
return False

# Looks like we are archiving the EXPDIR
return True
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved
@logit(logger)
def _pop_git_info(arch_dict: Dict[str, Any]) -> Dict[str, Any]:
"""
This function checks the configuration options ARCH_HASHES and ARCH_DIFFS
and ARCH_EXPDIR_FREQ to determine if the git hashes and/or diffs should be
added to the EXPDIR for archiving and execute the commands. The hashes and
diffs will be stored in EXPDIR/git_info.log.

Parameters
----------
arch_dict: Dict
Dictionary with required parameters, including the following:

EXPDIR: str
Location of the EXPDIR
HOMEgfs: str
Location of the HOMEgfs (the global workflow)
ARCH_HASHES: bool
Whether to archive git hashes of the workflow and submodules
ARCH_DIFFS: bool
Whether to archive git diffs of the workflow and submodules
"""

# Get commonly used variables
arch_hashes = arch_dict.ARCH_HASHES
arch_diffs = arch_dict.ARCH_DIFFS
homegfs = arch_dict.HOMEgfs
expdir = arch_dict.EXPDIR

# Find the git command
git = which('git')
if git is None:
raise FileNotFoundError("FATAL ERROR: the git command could not be found!")

output = ""
# Navigate to HOMEgfs to run the git commands
with chdir(homegfs):

# Are we running git to get hashes?
if arch_hashes:
output += "Global workflow hash:\n"

try:
output += git("rev-parse", "HEAD", output=str)
output += "\nSubmodule hashes:\n"
output += git("submodule", "status", output=str)
except ProcessError:
raise OSError("FATAL ERROR Failed to run git")
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved

# Are we running git to get diffs?
if arch_diffs:
output += "Global workflow diffs:\n"
# This command will only work on git v2.14+
try:
output += git("diff", "--submodule=diff", output=str)
except ProcessError:
# The version of git may be too old. See if we can run just a surface diff.
try:
output += git("diff", output=str)
print("WARNING git was unable to do a recursive diff.\n"
"Only a top level diff was performed.\n"
"Note that the git version must be >= 2.14 for this feature.")
except ProcessError:
raise OSError("FATAL ERROR Failed to run 'git diff'")
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved

# Write out to the log file
try:
with open(os.path.join(expdir, git_filename), 'w') as output_file:
output_file.write(output)
except OSError:
fname = os.path.join(expdir, git_filename)
raise OSError(f"FATAL ERROR Unable to write git output to '{fname}'")
DavidHuber-NOAA marked this conversation as resolved.
Show resolved Hide resolved

return

@logit(logger)
def clean(self):
"""
Remove the temporary directories/files created by the Archive task.
Presently, this is only the ROTDIR/expdir directory if EXPDIR archiving
was performed.
"""

if self.archive_expdir:
temp_expdir_path = os.path.join(self.task_config.ROTDIR, "expdir." +
to_YMDH(self.task_config.current_cycle))
rmdir(temp_expdir_path)

return
1 change: 1 addition & 0 deletions workflow/generate_workflows.sh
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
G) _run_all_gfs=true ;;
E) _run_all_gefs=true ;;
S) _run_all_sfs=true ;;
A) _set_account=true && _hpc_account="${OPTARG}" ;;
Fixed Show fixed Hide fixed
c) _update_cron=true ;;
e) _email="${OPTARG}" && _set_email=true ;;
t) _tag="_${OPTARG}" ;;
Expand Down