diff --git a/.gitignore b/.gitignore index 713d8a6..5e133cd 100644 --- a/.gitignore +++ b/.gitignore @@ -243,9 +243,14 @@ Temporary Items # Neovim .nvimlog +# Intellij /.idea/codeStyles/codeStyleConfig.xml /.idea/misc.xml /.idea/modules.xml /.idea/inspectionProfiles/profiles_settings.xml /.idea/vcs.xml /.idea/PyhDToolkit.iml + +# Other +tst_* + diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c0c717..75e8f64 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,18 @@ # `pylhc-submitter` Changelog +## Version 2.0.0 + +- General code cleanup/refactoring/documentation: + - Partly breaks backward compatibility, if individual methods of the `job_submitter`-functionality have been used. + - Does not affect any setups simply calling the `main()` function of `job_submitter.py` or calling the `job_submitter` as a module. + - Apart from some fixed imports, following the new structure, the `autosix` module has been untouched. + + +- New Feature of `job_submitter`: + - `output_destination` input parameter, which sets an output directory in which the folder-stucture + for the jobs will be replicated and the job's `job_output_dir` will be copied into "manually" at the end of the job, + instead of having the directory transferred back to the `working directory` by htcondor. + ## Version 1.1.1 - Uses `concat` instead of `append` to stack the DataFrames. diff --git a/doc/Makefile b/doc/Makefile index 08f8749..ff66ee4 100644 --- a/doc/Makefile +++ b/doc/Makefile @@ -48,9 +48,9 @@ html: @echo "Build finished. The HTML pages are in $(BUILDDIR)/html." josch: - $(SPHINXBUILD) -b html $(ALLSPHINXOPTS) /home/jdilly/Software/Documentation/submitter-doc + $(SPHINXBUILD) -b html $(ALLSPHINXOPTS) ../../Documentation/submitter-doc @echo - @echo "Build finished. The HTML pages are in /home/jdilly/Software/Documentation/submitter-doc." + @echo "Build finished. The HTML pages are in ../../Documentation/submitter-doc." dirhtml: $(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml diff --git a/doc/_static/css/custom.css b/doc/_static/css/custom.css index f201296..eea8824 100644 --- a/doc/_static/css/custom.css +++ b/doc/_static/css/custom.css @@ -1,7 +1,41 @@ +:root { + --nav-side-width: 300px; /* default is 300px */ + /* for 100% width */ + /*--nav-content-width: 100%;*/ + /*--local-toc-width: 300px;*/ + /*--nav-content-width-wide: calc(100% - var(--local-toc-width)); /* 100% here is fullscreen */ + /*--local-toc-left: calc(100% - var(--local-toc-width)); /* 100% here is w/o sidebar */ + + /* for fixed widths */ + --nav-content-width: 800px; /* default is 800px */ + --nav-content-width-wide: var(--nav-content-width); + --local-toc-width: calc(100% - var(--nav-content-width-wide)); + --local-toc-left: calc(var(--nav-content-width-wide) + var(--nav-side-width)); +} + +/* main content width */ +.wy-nav-content { + max-width: var(--nav-content-width); +} + +/* Sidebar width */ +.wy-nav-side { + width: var(--nav-side-width); +} + .wy-side-nav-search { background: rgb(243,244,247); } +.wy-side-nav-search > a { + color: black; +} + +.wy-side-nav-search> a img.logo { + width: 50%; +} + + .wy-side-nav-search > div.version { color: black; } @@ -182,3 +216,60 @@ em.sig-param span.default_value { .rst-content table.field-list th { padding: 16px; } + + +/* Create local table of contents + ------------------------------ + inspired by https://github.com/readthedocs/sphinx_rtd_theme/pull/919 + and https://github.com/readthedocs/sphinx_rtd_theme/issues/764 + see also _templates/layout.html + */ + +#local-table-of-contents { + padding-bottom: 20px; + /* display: none; */ +} + +/* Mask entry of main header (chapter) */ +#local-table-of-contents a[href="#"]{ + /*display: none;*/ +} + +/* indent subsections */ +#local-table-of-contents ul > ul { + padding-left: 0px; + margin-left: 20px; + padding-right: 0; + padding-bottom: 5px; +} + + +#local-table-of-contents-title { + margin-bottom: 10px; +} + +/* Show in Sidebar if window width is larger than nav-side + nav-content + toc-width */ +@media screen and (min-width: 1200px) { + .wy-nav-content { + max-width: var(--nav-content-width-wide); + } + + #local-table-of-contents { + display: block; + position: fixed; + margin-left: 15px; + overflow-y: auto; + height: 95%; + top: 45px; + left: var(--local-toc-left); + width: var(--local-toc-width); + } + + #local-table-of-contents-title { + display: block; + font-size: 16px; + width: 100%; + padding-top: 10px; + padding-bottom: 5px; + } +} \ No newline at end of file diff --git a/doc/_templates/layout.html b/doc/_templates/layout.html new file mode 100644 index 0000000..aa67d6d --- /dev/null +++ b/doc/_templates/layout.html @@ -0,0 +1,12 @@ +{% extends "!layout.html" %} +{% block document %} + {%- if toc|length > title|length + 75 %} + + {%- endif %} + + {{ super() }} +{% endblock %} + diff --git a/doc/conf.py b/doc/conf.py index 6fbff20..283ad0a 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -66,9 +66,11 @@ def about_package(init_posixpath: pathlib.Path) -> dict: "sphinx.ext.githubpages", "sphinx.ext.napoleon", ] +autosectionlabel_prefix_document = True +autosectionlabel_maxdepth = 2 # Add any paths that contain templates here, relative to this directory. -# templates_path = ['_templates'] +templates_path = ['_templates'] # The suffix(es) of source filenames. # You can specify multiple suffix as a list of string: @@ -84,6 +86,11 @@ def about_package(init_posixpath: pathlib.Path) -> dict: copyright_ = "2019, pyLHC/OMC-TEAM" author = ABOUT_PYLHC_SUBMITTER["__author__"] +# Override link in 'Edit on Github' +rst_prolog = f""" +:github_url: {ABOUT_PYLHC_SUBMITTER['__url__']} +""" + # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the # built documents. @@ -98,7 +105,7 @@ def about_package(init_posixpath: pathlib.Path) -> dict: # # This is also used if you do content translation via gettext catalogs. # Usually you set "language" from the command line for these cases. -language = None +language = 'en' # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. diff --git a/doc/entrypoints/autosix.rst b/doc/entrypoints/autosix.rst index 470e586..0276dfc 100644 --- a/doc/entrypoints/autosix.rst +++ b/doc/entrypoints/autosix.rst @@ -1,2 +1,3 @@ .. automodule:: pylhc_submitter.autosix :members: + :noindex: diff --git a/doc/entrypoints/job_submitter.rst b/doc/entrypoints/job_submitter.rst index 7854e56..675569c 100644 --- a/doc/entrypoints/job_submitter.rst +++ b/doc/entrypoints/job_submitter.rst @@ -1,2 +1,3 @@ .. automodule:: pylhc_submitter.job_submitter :members: + :noindex: diff --git a/doc/modules/constants.rst b/doc/modules/constants.rst index a0a2e36..8474b25 100644 --- a/doc/modules/constants.rst +++ b/doc/modules/constants.rst @@ -1,15 +1,25 @@ Constants Definitions -************************** +********************* .. automodule:: pylhc_submitter.constants.general :members: + :noindex: .. automodule:: pylhc_submitter.constants.external_paths :members: + :noindex: +.. automodule:: pylhc_submitter.constants.job_submitter + :members: + :noindex: + +.. automodule:: pylhc_submitter.constants.htcondor + :members: + :noindex: .. automodule:: pylhc_submitter.constants.autosix :members: + :noindex: diff --git a/doc/modules/htc.rst b/doc/modules/htc.rst deleted file mode 100644 index f4965b0..0000000 --- a/doc/modules/htc.rst +++ /dev/null @@ -1,9 +0,0 @@ -HTCondor Tools -************************** - -.. automodule:: pylhc_submitter.htc.utils - :members: - - -.. automodule:: pylhc_submitter.htc.mask - :members: diff --git a/doc/modules/sixdesk_tools.rst b/doc/modules/sixdesk_tools.rst index 072ad58..0271a84 100644 --- a/doc/modules/sixdesk_tools.rst +++ b/doc/modules/sixdesk_tools.rst @@ -3,21 +3,28 @@ Sixdesk Tools .. automodule:: pylhc_submitter.sixdesk_tools.stages :members: + :noindex: .. automodule:: pylhc_submitter.sixdesk_tools.create_workspace :members: + :noindex: .. automodule:: pylhc_submitter.sixdesk_tools.submit :members: + :noindex: .. automodule:: pylhc_submitter.sixdesk_tools.post_process_da :members: + :noindex: .. automodule:: pylhc_submitter.sixdesk_tools.extract_data_from_db :members: + :noindex: .. automodule:: pylhc_submitter.sixdesk_tools.utils :members: + :noindex: .. automodule:: pylhc_submitter.sixdesk_tools.troubleshooting :members: + :noindex: diff --git a/doc/modules/submitter.rst b/doc/modules/submitter.rst new file mode 100644 index 0000000..ac54d63 --- /dev/null +++ b/doc/modules/submitter.rst @@ -0,0 +1,19 @@ +Submitter +********* + +.. automodule:: pylhc_submitter.submitter.htc_utils + :members: + :noindex: + + +.. automodule:: pylhc_submitter.submitter.iotools + :members: + :noindex: + +.. automodule:: pylhc_submitter.submitter.mask + :members: + :noindex: + +.. automodule:: pylhc_submitter.submitter.runners + :members: + :noindex: diff --git a/doc/modules/utils.rst b/doc/modules/utils.rst index aa7ecb0..107fb06 100644 --- a/doc/modules/utils.rst +++ b/doc/modules/utils.rst @@ -3,7 +3,9 @@ Utilities .. automodule:: pylhc_submitter.utils.iotools :members: + :noindex: .. automodule:: pylhc_submitter.utils.logging_tools :members: + :noindex: diff --git a/pylhc_submitter/__init__.py b/pylhc_submitter/__init__.py index 62de331..ed21d9e 100644 --- a/pylhc_submitter/__init__.py +++ b/pylhc_submitter/__init__.py @@ -10,7 +10,7 @@ __title__ = "pylhc_submitter" __description__ = "pylhc-submitter contains scripts to simplify the creation and submission of jobs to HTCondor at CERN" __url__ = "https://github.com/pylhc/submitter" -__version__ = "1.1.1" +__version__ = "2.0.0" __author__ = "pylhc" __author_email__ = "pylhc@github.com" __license__ = "MIT" diff --git a/pylhc_submitter/autosix.py b/pylhc_submitter/autosix.py index f1f908c..09b2903 100644 --- a/pylhc_submitter/autosix.py +++ b/pylhc_submitter/autosix.py @@ -193,34 +193,17 @@ import numpy as np import tfs -from generic_parser import EntryPointParameters, entrypoint, DotDict +from generic_parser import EntryPointParameters, entrypoint from generic_parser.entry_datatypes import DictAsString -from pylhc_submitter.constants.autosix import ( - HEADER_BASEDIR, - SIXENV_REQUIRED, - SIXENV_OPTIONAL, - AutoSixEnvironment, -) -from pylhc_submitter.htc.mask import generate_jobdf_index -from pylhc_submitter.job_submitter import ( - JOBSUMMARY_FILE, - COLUMN_JOBID, -) -from pylhc_submitter.sixdesk_tools.create_workspace import ( - set_max_materialize -) -from pylhc_submitter.sixdesk_tools.stages import Stage, STAGE_ORDER -from pylhc_submitter.sixdesk_tools.utils import ( - is_locked, - check_mask, -) -from pylhc_submitter.utils.iotools import ( - PathOrStr, - save_config, - make_replace_entries_iterable, - keys_to_path -) +from pylhc_submitter.constants.autosix import (HEADER_BASEDIR, SIXENV_OPTIONAL, SIXENV_REQUIRED, + AutoSixEnvironment) +from pylhc_submitter.constants.job_submitter import COLUMN_JOBID, JOBSUMMARY_FILE +from pylhc_submitter.submitter.mask import generate_jobdf_index +from pylhc_submitter.sixdesk_tools.stages import STAGE_ORDER, Stage +from pylhc_submitter.sixdesk_tools.utils import check_mask, is_locked +from pylhc_submitter.utils.iotools import (PathOrStr, keys_to_path, make_replace_entries_iterable, + save_config) from pylhc_submitter.utils.logging_tools import log_setup LOG = logging.getLogger(__name__) diff --git a/pylhc_submitter/constants/autosix.py b/pylhc_submitter/constants/autosix.py index 39d0945..b91065d 100644 --- a/pylhc_submitter/constants/autosix.py +++ b/pylhc_submitter/constants/autosix.py @@ -1,6 +1,6 @@ """ -Constants: Autosix ----------------------------------- +Autosix +------- Collections of constants and paths used in autosix. diff --git a/pylhc_submitter/constants/external_paths.py b/pylhc_submitter/constants/external_paths.py index d470cbf..2e7e9e2 100644 --- a/pylhc_submitter/constants/external_paths.py +++ b/pylhc_submitter/constants/external_paths.py @@ -1,6 +1,6 @@ """ -Constants: External Paths -------------------------- +External Paths +-------------- Specific constants relating to external paths to be used, to help with consistency. diff --git a/pylhc_submitter/constants/general.py b/pylhc_submitter/constants/general.py index 59f796d..2c68d93 100644 --- a/pylhc_submitter/constants/general.py +++ b/pylhc_submitter/constants/general.py @@ -1,6 +1,6 @@ """ -Constants: General ------------------- +General +------- General constants to help with consistency. """ diff --git a/pylhc_submitter/constants/htcondor.py b/pylhc_submitter/constants/htcondor.py new file mode 100644 index 0000000..8e9ac24 --- /dev/null +++ b/pylhc_submitter/constants/htcondor.py @@ -0,0 +1,24 @@ +""" +HTCondor +-------- + +Constants for the HTCondor parameters. +""" +SHEBANG = "#!/bin/bash" +SUBFILE = "queuehtc.sub" +BASH_FILENAME = "Job" + +HTCONDOR_JOBLIMIT = 100000 + +CMD_SUBMIT = "condor_submit" +JOBFLAVOURS = ( + "espresso", # 20 min + "microcentury", # 1 h + "longlunch", # 2 h + "workday", # 8 h + "tomorrow", # 1 d + "testmatch", # 3 d + "nextweek", # 1 w +) + +NOTIFICATIONS = ("always", "complete", "error", "never") \ No newline at end of file diff --git a/pylhc_submitter/constants/job_submitter.py b/pylhc_submitter/constants/job_submitter.py new file mode 100644 index 0000000..93c1236 --- /dev/null +++ b/pylhc_submitter/constants/job_submitter.py @@ -0,0 +1,33 @@ + +""" +Job Submitter +------------- + +Collections of constants and paths used in the job-submitter. +""" +from pylhc_submitter.constants.external_paths import MADX_BIN, PYTHON2_BIN, PYTHON3_BIN + +JOBSUMMARY_FILE = "Jobs.tfs" +JOBDIRECTORY_PREFIX = "Job" +CONFIG_FILE = "config.ini" + +SCRIPT_EXTENSIONS = { + "madx": ".madx", + "python3": ".py", + "python2": ".py", +} + +EXECUTEABLEPATH = { + "madx": MADX_BIN, + "python3": PYTHON3_BIN, + "python2": PYTHON2_BIN, +} + + +COLUMN_JOBID = "JobId" +COLUMN_SHELL_SCRIPT = "ShellScript" +COLUMN_JOB_DIRECTORY = "JobDirectory" +COLUMN_DEST_DIRECTORY = "DestDirectory" +COLUMN_JOB_FILE = "JobFile" + +NON_PARAMETER_COLUMNS = (COLUMN_SHELL_SCRIPT, COLUMN_JOB_DIRECTORY, COLUMN_JOB_FILE, COLUMN_DEST_DIRECTORY) \ No newline at end of file diff --git a/pylhc_submitter/job_submitter.py b/pylhc_submitter/job_submitter.py index dda9aa3..6efe9b8 100644 --- a/pylhc_submitter/job_submitter.py +++ b/pylhc_submitter/job_submitter.py @@ -14,124 +14,173 @@ and job directory for further post processing. For additional information and guides, see the `Job Submitter page -`_ in the ``OMC`` documentation site. +`_ in the ``OMC`` documentation site. + *--Required--* -- **mask** *(str)*: Program mask to use +- **mask** *(PathOrStr)*: + + Program mask to use + + +- **replace_dict** *(DictAsString)*: + + Dict containing the str to replace as keys and values a list of + parameters to replace + -- **replace_dict** *(DictAsString)*: Dict containing the str to replace as - keys and values a list of parameters to replace +- **working_directory** *(PathOrStr)*: -- **working_directory** *(str)*: Directory where data should be put + Directory where data should be put *--Optional--* -- **append_jobs**: Flag to rerun job with finer/wider grid, - already existing points will not be reexecuted. +- **append_jobs**: + + Flag to rerun job with finer/wider grid, already existing points will + not be reexecuted. + + action: ``store_true`` + + +- **check_files** *(str)*: + + List of files/file-name-masks expected to be in the 'job_output_dir' + after a successful job (for appending/resuming). Uses the 'glob' + function, so unix-wildcards (*) are allowed. If not given, only the + presence of the folder itself is checked. + + +- **dryrun**: + + Flag to only prepare folders and scripts, but does not start/submit + jobs. Together with `resume_jobs` this can be use to check which jobs + succeeded and which failed. + + action: ``store_true`` + + +- **executable** *(PathOrStr)*: + + Path to executable or job-type (of ['madx', 'python3', 'python2']) to + use. + + default: ``madx`` + + +- **htc_arguments** *(DictAsString)*: + + Additional arguments for htcondor, as Dict-String. For AccountingGroup + please use 'accounting_group'. 'max_retries' and 'notification' have + defaults (if not given). Others are just passed on. + + default: ``{}`` + + +- **job_output_dir** *(str)*: + + The name of the output dir of the job. (Make sure your script puts its + data there!) + + default: ``Outputdata`` + + +- **jobflavour** *(str)*: + + Jobflavour to give rough estimate of runtime of one job + + choices: ``('espresso', 'microcentury', 'longlunch', 'workday', 'tomorrow', 'testmatch', 'nextweek')`` + + default: ``workday`` + + +- **jobid_mask** *(str)*: + + Mask to name jobs from replace_dict + + +- **num_processes** *(int)*: + + Number of processes to be used if run locally + + default: ``4`` + + +- **output_destination** *(PathOrStr)*: + + Directory to copy the output of the jobs to, sorted into folders per job. + Can be on EOS, preferrably via EOS-URI format ('root://eosuser.cern.ch//eos/...'). + + +- **resume_jobs**: + + Only do jobs that did not work. + + action: ``store_true`` - Action: ``store_true`` -- **check_files** *(str)*: List of files/file-name-masks expected to be in the - 'job_output_dir' after a successful job (for appending/resuming). Uses the 'glob' - function, so unix-wildcards (*) are allowed. If not given, only the presence of the folder itself is checked. -- **dryrun**: Flag to only prepare folders and scripts, - but does not start/submit jobs. - Together with `resume_jobs` this can be use to check which jobs succeeded and which failed. - Action: ``store_true`` -- **executable** *(str)*: Path to executable or job-type (of ['madx', 'python3', 'python2']) to use. +- **run_local**: -- **htc_arguments** *(DictAsString)*: Additional arguments for htcondor, as Dict-String. - For AccountingGroup please use 'accounting_group'. 'max_retries' and 'notification' have defaults (if not given). - Others are just passed on. + Flag to run the jobs on the local machine. Not suggested. - Default: ``{}`` -- **job_output_dir** *(str)*: The name of the output dir of the job. (Make sure your script puts its data there!) + action: ``store_true`` - Default: ``Outputdata`` -- **jobflavour** *(str)*: Jobflavour to give rough estimate of runtime of one job - Choices: ``('espresso', 'microcentury', 'longlunch', 'workday', 'tomorrow', 'testmatch', 'nextweek')`` - Default: ``workday`` -- **jobid_mask** *(str)*: Mask to name jobs from replace_dict +- **script_arguments** *(DictAsString)*: -- **num_processes** *(int)*: Number of processes to be used if run locally + Additional arguments to pass to the script, as dict in key-value pairs + ('--' need to be included in the keys). - Default: ``4`` -- **resume_jobs**: Only do jobs that did not work. + default: ``{}`` - Action: ``store_true`` -- **run_local**: Flag to run the jobs on the local machine. Not suggested. - Action: ``store_true`` -- **script_arguments** *(DictAsString)*: Additional arguments to pass to the script, - as dict in key-value pairs ('--' need to be included in the keys). +- **script_extension** *(str)*: - Default: ``{}`` -- **script_extension** *(str)*: New extension for the scripts created from the masks. - This is inferred automatically for ['madx', 'python3', 'python2']. Otherwise not changed. + New extension for the scripts created from the masks. This is inferred + automatically for ['madx', 'python3', 'python2']. Otherwise not + changed. -- **ssh** *(str)*: Run htcondor from this machine via ssh (needs access to the `working_directory`) + +- **ssh** *(str)*: + + Run htcondor from this machine via ssh (needs access to the + `working_directory`) -:author: mihofer, jdilly, fesoubel """ -import itertools import logging -import multiprocessing -import subprocess import sys +from dataclasses import fields from pathlib import Path -import numpy as np -import tfs from generic_parser import EntryPointParameters, entrypoint from generic_parser.entry_datatypes import DictAsString from generic_parser.tools import print_dict_tree -import pylhc_submitter.htc.utils as htcutils -from pylhc_submitter.htc.mask import ( - check_percentage_signs_in_mask, - create_jobs_from_mask, - find_named_variables_in_mask, - generate_jobdf_index, -) -from pylhc_submitter.htc.utils import ( - COLUMN_JOB_DIRECTORY, - COLUMN_SHELL_SCRIPT, - EXECUTEABLEPATH, - HTCONDOR_JOBLIMIT, - JOBFLAVOURS, -) -from pylhc_submitter.utils.environment_tools import on_windows -from pylhc_submitter.utils.iotools import PathOrStr, save_config, make_replace_entries_iterable, keys_to_path +from pylhc_submitter.constants.htcondor import JOBFLAVOURS +from pylhc_submitter.constants.job_submitter import EXECUTEABLEPATH, SCRIPT_EXTENSIONS +from pylhc_submitter.submitter.iotools import CreationOpts, create_jobs, is_eos_uri, print_stats +from pylhc_submitter.submitter.mask import (check_percentage_signs_in_mask, + find_named_variables_in_mask, is_mask_file) +from pylhc_submitter.submitter.runners import RunnerOpts, run_jobs +from pylhc_submitter.utils.iotools import (PathOrStr, keys_to_path, make_replace_entries_iterable, + save_config) from pylhc_submitter.utils.logging_tools import log_setup -JOBSUMMARY_FILE = "Jobs.tfs" -JOBDIRECTORY_PREFIX = "Job" -COLUMN_JOBID = "JobId" -CONFIG_FILE = "config.ini" - -SCRIPT_EXTENSIONS = { - "madx": ".madx", - "python3": ".py", - "python2": ".py", -} - LOG = logging.getLogger(__name__) try: import htcondor - HAS_HTCONDOR = True except ImportError: platform = "macOS" if sys.platform == "darwin" else "windows" LOG.warning( f"htcondor python bindings are linux-only. You can still use job_submitter on {platform}, " "but only for local runs." ) - HAS_HTCONDOR = False + htcondor = None def get_params(): @@ -245,6 +294,12 @@ def get_params(): type=str, default="Outputdata", ) + params.add_parameter( + name="output_destination", + help="Directory to copy the output of the jobs to, sorted into folders per job. " + "Can be on EOS, preferrably via EOS-URI format ('root://eosuser.cern.ch//eos/...').", + type=PathOrStr, + ) params.add_parameter( name="htc_arguments", help=( @@ -273,214 +328,18 @@ def main(opt): else: LOG.info("Starting Job-submitter.") - opt = _check_opts(opt) - save_config(opt.working_directory, opt, "job_submitter") - - job_df = _create_jobs( - opt.working_directory, - opt.mask, - opt.jobid_mask, - opt.replace_dict, - opt.job_output_dir, - opt.append_jobs, - opt.executable, - opt.script_arguments, - opt.script_extension, - ) - job_df, dropped_jobs = _drop_already_ran_jobs( - job_df, opt.resume_jobs or opt.append_jobs, opt.job_output_dir, opt.check_files - ) + save_config(Path(opt.working_directory), opt, "job_submitter") + creation_opt, runner_opt = check_opts(opt) - if opt.run_local and not opt.dryrun: - _run_local(job_df, opt.num_processes) - else: - _run_htc( - job_df, - opt.working_directory, - opt.job_output_dir, - opt.jobflavour, - opt.ssh, - opt.dryrun, - opt.htc_arguments, - ) - if opt.dryrun: - _print_stats(job_df.index, dropped_jobs) - - -# Main Functions --------------------------------------------------------------- - - -def _create_jobs( - cwd, - mask_path_or_string, - jobid_mask, - replace_dict, - output_dir, - append_jobs, - executable, - script_args, - script_extension, -) -> tfs.TfsDataFrame: - LOG.debug("Creating Jobs.") - values_grid = np.array(list(itertools.product(*replace_dict.values())), dtype=object) - - if append_jobs: - jobfile_path = cwd / JOBSUMMARY_FILE - try: - job_df = tfs.read(str(jobfile_path.absolute()), index=COLUMN_JOBID) - except FileNotFoundError as filerror: - raise FileNotFoundError( - "Cannot append jobs, as no previous jobfile was found at " f"'{jobfile_path}'" - ) from filerror - mask = [elem not in job_df[replace_dict.keys()].values for elem in values_grid] - njobs = mask.count(True) - values_grid = values_grid[mask] - else: - njobs = len(values_grid) - job_df = tfs.TfsDataFrame() - - if njobs == 0: - raise ValueError(f"No (new) jobs found!") - if njobs > HTCONDOR_JOBLIMIT: - LOG.warning( - f"You are attempting to submit an important number of jobs ({njobs})." - "This can be a high stress on your system, make sure you know what you are doing." - ) - - LOG.debug(f"Initial number of jobs: {njobs:d}") - data_df = tfs.TfsDataFrame( - index=generate_jobdf_index(job_df, jobid_mask, replace_dict.keys(), values_grid), - columns=list(replace_dict.keys()), - data=values_grid, - ) - job_df = tfs.concat([job_df, data_df], sort=False, how_headers='left') - job_df = _setup_folders(job_df, cwd) - - if htcutils.is_mask_file(mask_path_or_string): - LOG.debug("Creating all jobs from mask.") - script_extension = _get_script_extension(script_extension, executable, mask_path_or_string) - job_df = create_jobs_from_mask( - job_df, mask_path_or_string, replace_dict.keys(), script_extension - ) - - LOG.debug("Creating shell scripts for submission.") - job_df = htcutils.write_bash( - job_df, - output_dir, - executable=executable, - cmdline_arguments=script_args, - mask=mask_path_or_string, - ) - - job_df[COLUMN_JOB_DIRECTORY] = job_df[COLUMN_JOB_DIRECTORY].apply(str) - tfs.write(str(cwd / JOBSUMMARY_FILE), job_df, save_index=COLUMN_JOBID) - return job_df - - -def _drop_already_ran_jobs( - job_df: tfs.TfsDataFrame, drop_jobs: bool, output_dir: str, check_files: str -): - LOG.debug("Dropping already finished jobs, if necessary.") - finished_jobs = [] - if drop_jobs: - finished_jobs = [ - idx - for idx, row in job_df.iterrows() - if _job_was_successful(row, output_dir, check_files) - ] - LOG.info( - f"{len(finished_jobs):d} of {len(job_df.index):d}" - " Jobs have already finished and will be skipped." - ) - job_df = job_df.drop(index=finished_jobs) - return job_df, finished_jobs - - -def _run_local(job_df: tfs.TfsDataFrame, num_processes: int) -> None: - LOG.info(f"Running {len(job_df.index)} jobs locally in {num_processes:d} processes.") - pool = multiprocessing.Pool(processes=num_processes) - res = pool.map(_execute_shell, job_df.iterrows()) - if any(res): - LOG.error("At least one job has failed.") - raise RuntimeError("At least one job has failed. Check output logs!") - - -def _run_htc( - job_df: tfs.TfsDataFrame, - cwd: str, - output_dir: str, - flavour: str, - ssh: str, - dryrun: bool, - additional_htc_arguments: DictAsString, -) -> None: - LOG.info(f"Submitting {len(job_df.index)} jobs on htcondor, flavour '{flavour}'.") - LOG.debug("Creating htcondor subfile.") - subfile = htcutils.make_subfile( - cwd, job_df, output_dir=output_dir, duration=flavour, **additional_htc_arguments - ) - if not dryrun: - LOG.debug("Submitting jobs to htcondor.") - htcutils.submit_jobfile(subfile, ssh) + job_df, dropped_jobs = create_jobs(creation_opt) + run_jobs(job_df, runner_opt) -def _get_script_extension(script_extension: str, executable: PathOrStr, mask: PathOrStr) -> str: - if script_extension is not None: - return script_extension - return SCRIPT_EXTENSIONS.get(executable, mask.suffix) + print_stats(job_df.index, dropped_jobs) -# Sub Functions ---------------------------------------------------------------- - - -def _check_htcondor_presence() -> None: - """Checks the ``HAS_HTCONDOR`` variable and raises EnvironmentError if it is ``False``.""" - if not HAS_HTCONDOR: - raise EnvironmentError("htcondor bindings are necessary to run this module.") - - -def _setup_folders(job_df: tfs.TfsDataFrame, working_directory: PathOrStr) -> tfs.TfsDataFrame: - def _return_job_dir(job_id): - return working_directory / f"{JOBDIRECTORY_PREFIX}.{job_id}" - - LOG.debug("Setting up folders: ") - job_df[COLUMN_JOB_DIRECTORY] = [_return_job_dir(id_) for id_ in job_df.index] - - for job_dir in job_df[COLUMN_JOB_DIRECTORY]: - try: - job_dir.mkdir() - except IOError: - LOG.debug(f" failed '{job_dir}' (might already exist).") - else: - LOG.debug(f" created '{job_dir}'.") - return job_df - - -def _job_was_successful(job_row, output_dir, files) -> bool: - output_dir = Path(job_row[COLUMN_JOB_DIRECTORY], output_dir) - success = output_dir.is_dir() and any(output_dir.iterdir()) - if success and files is not None and len(files): - for f in files: - success &= len(list(output_dir.glob(f))) > 0 - return success - - -def _execute_shell(df_row) -> int: - idx, column = df_row - cmd = [] if on_windows() else ["sh"] - - with Path(column[COLUMN_JOB_DIRECTORY], "log.tmp").open("w") as logfile: - process = subprocess.Popen( - cmd + [column[COLUMN_SHELL_SCRIPT]], - shell=on_windows(), - stdout=logfile, - stderr=subprocess.STDOUT, - cwd=column[COLUMN_JOB_DIRECTORY], - ) - return process.wait() - - -def _check_opts(opt): +def check_opts(opt): + """ Checks options and sorts them into job-creation and running parameters. """ LOG.debug("Checking options.") if opt.resume_jobs and opt.append_jobs: raise ValueError("Select either Resume jobs or Append jobs") @@ -491,15 +350,21 @@ def _check_opts(opt): if str(opt.executable) in EXECUTEABLEPATH.keys(): opt.executable = str(opt.executable) - if htcutils.is_mask_file(opt.mask): - mask = Path(opt.mask).read_text() # checks that mask and dir are there - opt["mask"] = Path(opt["mask"]) + if is_mask_file(opt.mask): + mask_content = Path(opt.mask).read_text() # checks that mask and dir are there + opt.mask = Path(opt.mask) else: - mask = opt.mask + mask_content = opt.mask + + if is_eos_uri(opt.output_destination) and not ("://" in opt.output_destination and "//eos" in opt.output_destination): + raise ValueError( + "The 'output_destination' is an EOS-URI but missing '://' or '//eos' (double slashes?). " + ) + # Replace dict --- dict_keys = set(opt.replace_dict.keys()) - mask_keys = find_named_variables_in_mask(mask) + mask_keys = find_named_variables_in_mask(mask_content) not_in_mask = dict_keys - mask_keys not_in_dict = mask_keys - dict_keys @@ -519,25 +384,24 @@ def _check_opts(opt): [opt.replace_dict.pop(key) for key in not_in_mask] if len(opt.replace_dict) == 0: raise KeyError("Empty replace-dictionary") - check_percentage_signs_in_mask(mask) + check_percentage_signs_in_mask(mask_content) print_dict_tree(opt, name="Input parameter", print_fun=LOG.debug) opt.replace_dict = make_replace_entries_iterable(opt.replace_dict) - return opt - - -def _print_stats(new_jobs, finished_jobs): - """Print some quick statistics.""" - LOG.info("------------- QUICK STATS ----------------") - LOG.info(f"Jobs total:{len(new_jobs) + len(finished_jobs):d}") - LOG.info(f"Jobs to run: {len(new_jobs):d}") - LOG.info(f"Jobs already finished: {len(finished_jobs):d}") - LOG.info("---------- JOBS TO RUN: NAMES -------------") - for job_name in new_jobs: - LOG.info(job_name) - LOG.info("--------- JOBS FINISHED: NAMES ------------") - for job_name in finished_jobs: - LOG.info(job_name) + + # Create new classes + opt.output_dir = opt.job_output_dir # renaming + + creation = CreationOpts(**{f.name: opt[f.name] for f in fields(CreationOpts)}) + runner = RunnerOpts(**{f.name: opt[f.name] for f in fields(RunnerOpts)}) + runner.output_dir = None if opt.output_destination else opt.output_dir + return creation, runner + + +def _check_htcondor_presence() -> None: + """ Raises an error if htcondor is not installed. """ + if htcondor is None: + raise EnvironmentError("htcondor bindings are necessary to run this module.") # Script Mode ------------------------------------------------------------------ diff --git a/pylhc_submitter/sixdesk_tools/utils.py b/pylhc_submitter/sixdesk_tools/utils.py index 2c629fb..d287eb0 100644 --- a/pylhc_submitter/sixdesk_tools/utils.py +++ b/pylhc_submitter/sixdesk_tools/utils.py @@ -10,7 +10,7 @@ from pylhc_submitter.constants.autosix import SIXDESKLOCKFILE, get_workspace_path from pylhc_submitter.constants.external_paths import SIXDESK_UTILS -from pylhc_submitter.htc.mask import find_named_variables_in_mask +from pylhc_submitter.submitter.mask import find_named_variables_in_mask LOG = logging.getLogger(__name__) diff --git a/pylhc_submitter/htc/__init__.py b/pylhc_submitter/submitter/__init__.py similarity index 100% rename from pylhc_submitter/htc/__init__.py rename to pylhc_submitter/submitter/__init__.py diff --git a/pylhc_submitter/htc/utils.py b/pylhc_submitter/submitter/htc_utils.py similarity index 51% rename from pylhc_submitter/htc/utils.py rename to pylhc_submitter/submitter/htc_utils.py index 80f2bc2..873fd23 100644 --- a/pylhc_submitter/htc/utils.py +++ b/pylhc_submitter/submitter/htc_utils.py @@ -15,68 +15,58 @@ import logging import subprocess from pathlib import Path -from typing import Union +from typing import Any, Dict, List, Union from pandas import DataFrame -from pylhc_submitter.utils.environment_tools import on_windows +from pylhc_submitter.constants.htcondor import (BASH_FILENAME, CMD_SUBMIT, HTCONDOR_JOBLIMIT, + JOBFLAVOURS, NOTIFICATIONS, SHEBANG, SUBFILE) +from pylhc_submitter.constants.job_submitter import (COLUMN_DEST_DIRECTORY, COLUMN_JOB_DIRECTORY, + COLUMN_JOB_FILE, COLUMN_SHELL_SCRIPT, + EXECUTEABLEPATH, NON_PARAMETER_COLUMNS) +from pylhc_submitter.submitter import iotools +from pylhc_submitter.submitter.mask import is_mask_file +from pylhc_submitter.utils.environment import on_windows try: import htcondor except ImportError: # will be handled by job_submitter - pass + class htcondor: + """Dummy HTCondor module. To satisfy the typing. """ + Submit: Any = None -from pylhc_submitter.constants.external_paths import MADX_BIN, PYTHON2_BIN, PYTHON3_BIN LOG = logging.getLogger(__name__) -SHEBANG = "#!/bin/bash" -SUBFILE = "queuehtc.sub" -BASH_FILENAME = "Job" - -HTCONDOR_JOBLIMIT = 100000 - -EXECUTEABLEPATH = { - "madx": MADX_BIN, - "python3": PYTHON3_BIN, - "python2": PYTHON2_BIN, -} - - -CMD_SUBMIT = "condor_submit" -JOBFLAVOURS = ( - "espresso", # 20 min - "microcentury", # 1 h - "longlunch", # 2 h - "workday", # 8 h - "tomorrow", # 1 d - "testmatch", # 3 d - "nextweek", # 1 w -) - -NOTIFICATIONS = ("always", "complete", "error", "never") - - -COLUMN_SHELL_SCRIPT = "ShellScript" -COLUMN_JOB_DIRECTORY = "JobDirectory" -COLUMN_JOB_FILE = "JobFile" - - # Subprocess Methods ########################################################### -def create_subfile_from_job(cwd: Path, job: str): - """Write file to submit to ``HTCondor``.""" +def create_subfile_from_job(cwd: Path, submission: Union[str, htcondor.Submit]) -> Path: + """ + Write file to submit to ``HTCondor``. + + Args: + cwd (Path): working directory + submission (str, htcondor.Submit): HTCondor submission definition (i.e. content of the file) + + Returns: + Path: path to sub-file + """ subfile = cwd / SUBFILE LOG.debug(f"Writing sub-file '{str(subfile)}'.") with subfile.open("w") as f: - f.write(str(job)) + f.write(str(submission)) return subfile -def submit_jobfile(jobfile: Path, ssh: str): - """Submit subfile to ``HTCondor`` via subprocess.""" +def submit_jobfile(jobfile: Path, ssh: str) -> None: + """Submit subfile to ``HTCondor`` via subprocess. + + Args: + jobfile (Path): path to sub-file + ssh (str): ssh target + """ proc_args = [CMD_SUBMIT, jobfile] if ssh: proc_args = ["ssh", ssh] + proc_args @@ -87,7 +77,15 @@ def submit_jobfile(jobfile: Path, ssh: str): LOG.info("Jobs successfully submitted.") -def _start_subprocess(command): +def _start_subprocess(command: List[str]) -> int: + """ Start subprocess and log output. + + Args: + command (List[str]): command to execute + + Returns: + int: return code of the process + """ LOG.debug(f"Executing command '{command}'") process = subprocess.Popen( command, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, @@ -102,9 +100,10 @@ def _start_subprocess(command): # Job Creation ################################################################# -def create_multijob_for_bashfiles(job_df: DataFrame, **kwargs): +def create_multijob_for_bashfiles(job_df: DataFrame, **kwargs) -> str: """ - Function to create an ``HTCondor`` job assuming n_files bash-files. + Function to create an ``HTCondor`` submission content for all job-scripts, + i.e. bash-files, in the job_df. Keyword Args: output_dir (str): output directory that will be transferred. Defaults to ``None``. @@ -114,7 +113,11 @@ def create_multijob_for_bashfiles(job_df: DataFrame, **kwargs): retries (int): maximum amount of retries. Default to ``3``. notification (str): Notify under certain conditions. Defaults to ``error``. priority (int): Priority to order your jobs. Defaults to ``None``. + + Returns: + str: HTCondor submission definition. """ + # Pre-defined HTCondor arguments for our jobs submit_dict = { "MyId": "htcondor", "universe": "vanilla", @@ -125,9 +128,10 @@ def create_multijob_for_bashfiles(job_df: DataFrame, **kwargs): "on_exit_remove": "(ExitBySignal == False) && (ExitCode == 0)", "requirements": "Machine =!= LastRemoteHost", } - submit_dict.update(_map_kwargs(kwargs)) - - job = htcondor.Submit(submit_dict) + submit_dict.update(map_kwargs(kwargs)) + + # Let the htcondor create the submit-file + submission = htcondor.Submit(submit_dict) # add the multiple bash files scripts = [ @@ -137,20 +141,27 @@ def create_multijob_for_bashfiles(job_df: DataFrame, **kwargs): args = [",".join(parts) for parts in zip(scripts, job_df[COLUMN_JOB_DIRECTORY])] queueArgs = ["queue executable, initialdir from (", *args, ")"] - # ugly but job.setQArgs doesn't take string containing \n - # job.setQArgs("\n".join(queueArgs)) - job = str(job) + "\n".join(queueArgs) - LOG.debug(f"Created HTCondor subfile with content: \n{job}") - return job + # ugly but submission.setQArgs doesn't take string containing '\n': + # submission.setQArgs("\n".join(queueArgs)) # doesn't work + submission = str(submission) + "\n".join(queueArgs) + LOG.debug(f"Created HTCondor subfile with content: \n{submission}") + return submission # Main functions ############################################################### -def make_subfile(cwd: Path, job_df: DataFrame, **kwargs): +def make_subfile(cwd: Path, job_df: DataFrame, **kwargs) -> Path: """ Creates submit-file for ``HTCondor``. For kwargs, see ``create_multijob_for_bashfiles``. + + Args: + cwd (Path): working directory + job_df (DataFrame): DataFrame containing all the job-information + + Returns: + Path: path to the submit-file """ job = create_multijob_for_bashfiles(job_df, **kwargs) return create_subfile_from_job(cwd, job) @@ -163,56 +174,89 @@ def write_bash( cmdline_arguments: dict = None, mask: Union[str, Path] = None, ) -> DataFrame: - """Write the bash-files to be called by ``HTCondor``.""" + """ + Write the bash-files to be called by ``HTCondor``, which in turn call the executable. + Takes as input `Dataframe`, job type, and optional additional commandline arguments for the script. + A shell script is created in each job directory in the dataframe. + + Args: + job_df (DataFrame): DataFrame containing all the job-information + output_dir (str): output directory that will be transferred. Defaults to ``None``. + executable (str): name of the executable. Defaults to ``madx``. + cmdline_arguments (dict): additional commandline arguments for the executable + mask (Union[str, Path]): string or path to the mask-file. Defaults to ``None``. + + Returns: + DataFrame: The provided ``job_df`` but with added path to the scripts. + """ if len(job_df.index) > HTCONDOR_JOBLIMIT: raise AttributeError("Submitting too many jobs for HTCONDOR") - cmds = "" - if cmdline_arguments is not None: - cmds = f" {' '.join([f'{param} {val}' for param, val in cmdline_arguments.items()])}" - - if executable is None: - exec_path = '' - else: - exec_path = f"{str(EXECUTEABLEPATH.get(executable, executable))} " + exec_path = f"{str(EXECUTEABLEPATH.get(executable, executable))} " if executable else '' + cmds = f" {' '.join([f'{param} {val}' for param, val in cmdline_arguments.items()])}" if cmdline_arguments else '' shell_scripts = [None] * len(job_df.index) for idx, (jobid, job) in enumerate(job_df.iterrows()): job_dir = Path(job[COLUMN_JOB_DIRECTORY]) bash_file_name = f"{BASH_FILENAME}.{jobid}.{'bat' if on_windows() else 'sh'}" jobfile = job_dir / bash_file_name + LOG.debug(f"Writing bash-file {idx:d} '{jobfile}'.") with open(jobfile, "w") as f: + # Preparation --- if not on_windows(): - f.write(f"{SHEBANG}\n") + f.write(f"{SHEBANG}\n") + if output_dir is not None: f.write(f"mkdir {str(output_dir)}\n") + + # The actual job execution --- f.write(exec_path) + # Call the mask-file or the filled-template string if is_mask_file(mask): f.write(str(job_dir / job[COLUMN_JOB_FILE])) else: - replace_columns = [column for column in job.index.tolist() if column not in [COLUMN_SHELL_SCRIPT, COLUMN_JOB_DIRECTORY, COLUMN_JOB_FILE]] + replace_columns = [column for column in job.index.tolist() if column not in NON_PARAMETER_COLUMNS] f.write(mask % dict(zip(replace_columns, job[replace_columns]))) + + # Additional commands for the mask/string f.write(cmds) f.write("\n") + + # Manually copy output (if needed) --- + dest_dir = job.get(COLUMN_DEST_DIRECTORY) + if output_dir and dest_dir and output_dir != dest_dir: + if iotools.is_eos_uri(dest_dir): + # Note: eos-cp needs `/` at the end of both, source and target, dirs... + cp_command = f'eos cp -r {_str_ending_with_slash(output_dir)} {_str_ending_with_slash(dest_dir)}' + else: + # ...but '/' at the end of source dir copies only the content on macOS. + cp_command = f'cp -r {output_dir} {_str_ending_with_slash(dest_dir)}' + + f.write(f'{cp_command}\n') + shell_scripts[idx] = bash_file_name + job_df[COLUMN_SHELL_SCRIPT] = shell_scripts return job_df -# Helper ####################################################################### +def map_kwargs(add_dict: Dict[str, Any]) -> Dict[str, Any]: + """ + Maps the kwargs for the job-file. + Some arguments have pre-defined choices and defaults, the remaining ones are just passed on. + Args: + add_dict (Dict[str, Any]): additional kwargs to add to the defaults. -def _map_kwargs(add_dict): - """ - Maps the kwargs for the job-file. Some arguments have pre-defined choices and defaults, - the remaining ones are just passed on. + Returns: + Dict[str, Any]: The mapped kwargs. """ new = {} - # Predefined ones - htc_map = { + # Predefined mappings + htc_map = { # name: mapped_name, choices, default "duration": ("+JobFlavour", JOBFLAVOURS, "workday"), "output_dir": ("transfer_output_files", None, None), "accounting_group": ("+AccountingGroup", None, None), @@ -223,14 +267,14 @@ def _map_kwargs(add_dict): try: value = add_dict.pop(key) except KeyError: - if default is not None: - new[mapped] = default + value = default # could be `None` else: if choices is not None and value not in choices: raise TypeError( f"{key} needs to be one of '{str(choices).strip('[]')}' but " f"instead was '{value}'" ) + if value is not None: new[mapped] = _maybe_put_in_quotes(mapped, value) # Pass-Through Arguments @@ -239,20 +283,22 @@ def _map_kwargs(add_dict): return new -def _maybe_put_in_quotes(key, value): +# Helper ####################################################################### + +def _maybe_put_in_quotes(key: str, value: Any) -> Any: + """ Put value in quoted strings if key starts with '+' """ if key.startswith("+"): return f'"{value}"' return value -def is_mask_file(mask): - try: - return Path(mask).is_file() - except OSError: - return False +def _str_ending_with_slash(s: Union[Path, str]) -> str: + """ Add a slash at the end of a path if not present. """ + s = str(s) + if s.endswith("/"): + return s + return f"{s}/" -def is_mask_string(mask): - return not is_mask_file(mask) # Script Mode ################################################################## diff --git a/pylhc_submitter/submitter/iotools.py b/pylhc_submitter/submitter/iotools.py new file mode 100644 index 0000000..5c66c90 --- /dev/null +++ b/pylhc_submitter/submitter/iotools.py @@ -0,0 +1,318 @@ +""" +Job Submitter IO-Tools +---------------------- + +Tools for input and output for the job-submitter. +""" +import itertools +import logging +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Dict, List, Sequence, Tuple, Union + +import numpy as np +import pandas as pd +import tfs + +from pylhc_submitter.constants.htcondor import HTCONDOR_JOBLIMIT +from pylhc_submitter.constants.job_submitter import (COLUMN_DEST_DIRECTORY, COLUMN_JOB_DIRECTORY, + COLUMN_JOBID, JOBDIRECTORY_PREFIX, + JOBSUMMARY_FILE, SCRIPT_EXTENSIONS) +from pylhc_submitter.submitter import htc_utils +from pylhc_submitter.submitter.mask import (create_job_scripts_from_mask, generate_jobdf_index, + is_mask_file) + +LOG = logging.getLogger(__name__) + + +@dataclass +class CreationOpts: + """ Options for creating jobs. """ + working_directory: Path # Path to working directory (afs) + mask: Union[Path, str] # Path to mask file or mask-string + jobid_mask: str # Mask for jobid + replace_dict: Dict[str, Any] # Replace-dict + output_dir: Path # Path to local output directory + output_destination: Union[Path, str] # Path or URI to remote output directory (e.g. eos) + append_jobs: bool # Append jobs to existing jobs + resume_jobs: bool # Resume jobs that have already run/failed/got interrupted + executable: str # Name of executable to call the script (from mask) + check_files: Sequence[str] # List of output files to check for success + script_arguments: Dict[str, Any] # Arguments to pass to script + script_extension: str # Extension of the script to run + + def should_drop_jobs(self) -> bool: + """ Check if jobs should be dropped after creating the whole parameter space, + e.g. because they already exist. """ + return self.append_jobs or self.resume_jobs + + + +def create_jobs(opt: CreationOpts) -> tfs.TfsDataFrame: + """Main function to prepare all the jobs and folder structure. + This greates the value-grid based on the replace-dict and + checks for existing jobs (if so desired). + A job-dataframe is created - and written out - containing all the information and + its values are used to generate the job-scripts. + It also creates bash-scripts to call the executable for the job-scripts. + + Args: + opt (CreationOpts): Options for creating jobs + + Returns: + tfs.TfsDataFrame: The job-dataframe containing information for all jobs. + """ + LOG.debug("Creating Jobs.") + + # Generate product of replace-dict and compare to existing jobs --- + parameters, values_grid, prev_job_df = _generate_parameter_space( + replace_dict=opt.replace_dict, + append_jobs=opt.append_jobs, + cwd=opt.working_directory, + ) + + # Check new jobs --- + njobs = len(values_grid) + if njobs == 0: + raise ValueError(f"No (new) jobs found!") + + if njobs > HTCONDOR_JOBLIMIT: + LOG.warning( + f"You are attempting to submit an important number of jobs ({njobs})." + "This can be a high stress on your system, make sure you know what you are doing." + ) + + LOG.debug(f"Initial number of jobs: {njobs:d}") + + # Generate new job-dataframe --- + job_df = tfs.TfsDataFrame( + index=generate_jobdf_index(prev_job_df, opt.jobid_mask, parameters, values_grid), + columns=parameters, + data=values_grid, + ) + job_df = tfs.concat([prev_job_df, job_df], sort=False, how_headers='left') + + # Setup folders --- + job_df = create_folders(job_df, opt.working_directory, opt.output_destination) + + # Create scripts --- + if is_mask_file(opt.mask): + LOG.debug("Creating all jobs from mask.") + script_extension = _get_script_extension(opt.script_extension, opt.executable, opt.mask) + job_df = create_job_scripts_from_mask( + job_df, opt.mask, parameters, script_extension + ) + + LOG.debug("Creating shell scripts.") + job_df = htc_utils.write_bash( + job_df, + output_dir=opt.output_dir, + executable=opt.executable, + cmdline_arguments=opt.script_arguments, + mask=opt.mask, + ) + + # Convert paths to strings and write df to file --- + job_df[COLUMN_JOB_DIRECTORY] = job_df[COLUMN_JOB_DIRECTORY].apply(str) + if COLUMN_DEST_DIRECTORY in job_df.columns: + job_df[COLUMN_DEST_DIRECTORY] = job_df[COLUMN_DEST_DIRECTORY].apply(str) + + tfs.write(str(opt.working_directory / JOBSUMMARY_FILE), job_df, save_index=COLUMN_JOBID) + + # Drop already run jobs --- + dropped_jobs = [] + if opt.should_drop_jobs(): + job_df, dropped_jobs = _drop_already_run_jobs( + job_df, opt.output_dir, opt.check_files + ) + return job_df, dropped_jobs + + +def create_folders(job_df: tfs.TfsDataFrame, working_directory: Path, + destination_directory: Union[Path, str] = None) -> tfs.TfsDataFrame: + """Create the folder-structure in the given working directory and the + destination directory if given. + This creates a folder per job in which then the job-scripts and bash-scripts + can be stored later. + + Args: + job_df (tfs.TfsDataFrame): DataFrame containing all the job-information + working_directory (Path): Path to the working directory + destination_directory (Path, optional): Path to the destination directory, + i.e. the directory to copy the outputs to manually. Defaults to None. + + Returns: + tfs.TfsDataFrame: The job-dataframe again, but with the added paths to the job-dirs. + """ + LOG.debug("Setting up folders: ") + + jobname = f"{JOBDIRECTORY_PREFIX}.{{0}}" + job_df[COLUMN_JOB_DIRECTORY] = [working_directory / jobname.format(id_) for id_ in job_df.index] + + for job_dir in job_df[COLUMN_JOB_DIRECTORY]: + job_dir.mkdir(exist_ok=True) + LOG.debug(f" created '{job_dir}'.") + + if destination_directory: + dest_path = uri_to_path(destination_directory) + dest_path.mkdir(parents=True, exist_ok=True) + + server = get_server_from_uri(destination_directory) + job_df[COLUMN_DEST_DIRECTORY] = [f"{server}{dest_path / jobname.format(id_)}" for id_ in job_df.index] + + # Make some symlinks for easy navigation--- + # Output directory -> Working Directory + sym_submission = dest_path / Path('SUBMISSION_DIR') + sym_submission.unlink(missing_ok=True) + sym_submission.symlink_to(working_directory.resolve(), target_is_directory=True) + + # Working Directory -> Output Directory + sym_destination = working_directory / Path('OUTPUT_DIR') + sym_destination.unlink(missing_ok=True) + sym_destination.symlink_to(dest_path.resolve(), target_is_directory=True) + + # Create output dirs per job --- + for job_dest_dir in job_df[COLUMN_DEST_DIRECTORY]: + uri_to_path(job_dest_dir).mkdir(exist_ok=True) + LOG.debug(f" created '{job_dest_dir}'.") + + return job_df + + +def is_eos_uri(path: Union[Path, str, None]) -> bool: + """ Check if the given path is an EOS-URI as `eos cp` only works with those. + E.g.: root://eosuser.cern.ch//eos/user/a/anabramo/banana.txt + + This function does not check the double slashes, + to avoid having the user pass a malformed path by accident and then + assuming it is just a path. This is tested for in + :func:`pylhc_submitter.job_submitter.check_opts`. + """ + if path is None: + return False + + parts = Path(path).parts + return ( + len(parts) >= 3 # at least root:, server, path + and + parts[0].endswith(':') + and + parts[2] == 'eos' + ) + + +def uri_to_path(path: Union[Path, str]) -> Path: + """ Strip EOS path information from a path. + EOS paths for HTCondor can be given as URI. Strip for direct writing. + E.g.: root://eosuser.cern.ch//eos/user/a/anabramo/banana.txt + """ + path = Path(path) + parts = path.parts + if parts[0].endswith(':'): + # the first two parts are host info, e.g `file: //host/path` + return Path('/', *parts[2:]) + return path + + +def get_server_from_uri(path: Union[Path, str]) -> str: + """ Get server information from a path. + E.g.: root://eosuser.cern.ch//eos/user/a/ -> root://eosuser.cern.ch/ + """ + path_part = uri_to_path(path) + if path_part == Path(path): + return "" + + server_part = str(path).replace(str(path_part), '') + if server_part.endswith("//"): + server_part = server_part[:-1] + return server_part + + +def print_stats(new_jobs: Sequence[str], finished_jobs: Sequence[str]): + """Print some quick statistics.""" + text = [ + "\n------------- QUICK STATS ----------------" + f"Jobs total:{len(new_jobs) + len(finished_jobs):d}", + f"Jobs to run: {len(new_jobs):d}", + f"Jobs already finished: {len(finished_jobs):d}", + "---------- JOBS TO RUN: NAMES -------------" + ] + for job_name in new_jobs: + text.append(job_name) + text += ["--------- JOBS FINISHED: NAMES ------------"] + for job_name in finished_jobs: + text.append(job_name) + LOG.info("\n".join(text)) + + +def _generate_parameter_space( + replace_dict: Dict[str, Any], append_jobs: bool, cwd: Path + ) -> Tuple[List[str], np.ndarray, tfs.TfsDataFrame]: + """ Generate parameter space from replace-dict, check for existing jobs. """ + LOG.debug("Generating parameter space from replace-dict.") + parameters = list(replace_dict.keys()) + values_grid = _generate_values_grid(replace_dict) + if not append_jobs: + return parameters, values_grid, tfs.TfsDataFrame() + + jobfile_path = cwd / JOBSUMMARY_FILE + try: + prev_job_df = tfs.read(str(jobfile_path.absolute()), index=COLUMN_JOBID) + except FileNotFoundError as filerror: + raise FileNotFoundError( + "Cannot append jobs, as no previous jobfile was found at " f"'{jobfile_path}'" + ) from filerror + new_jobs_mask = [elem not in prev_job_df[parameters].values for elem in values_grid] + values_grid = values_grid[new_jobs_mask] + + return parameters, values_grid, prev_job_df + + +def _generate_values_grid(replace_dict: Dict[str, Any]) -> np.ndarray: + """ Creates an array of the inner-product of the replace-dict. """ + return np.array(list(itertools.product(*replace_dict.values())), dtype=object) + + +def _drop_already_run_jobs( + job_df: tfs.TfsDataFrame, output_dir: str, check_files: str + ) -> Tuple[tfs.TfsDataFrame, List[str]]: + """ Check for jobs that have already been run and drop them from current job_df. """ + LOG.debug("Dropping already finished jobs.") + finished_jobs = [ + idx + for idx, row in job_df.iterrows() + if _job_was_successful(row, output_dir, check_files) + ] + + LOG.info( + f"{len(finished_jobs):d} of {len(job_df.index):d}" + " Jobs have already finished and will be skipped." + ) + + job_df = job_df.drop(index=finished_jobs) + return job_df, finished_jobs + + +def _job_was_successful(job_row: pd.Series, output_dir: str, files: Sequence[str]) -> bool: + """ Determines if the job was successful. + + Args: + job_row (pd.Series): row from the job_df + output_dir (str): Name of the (local) output directory + files (List[str]): list of files that should have been generated + """ + job_dir = job_row.get(COLUMN_DEST_DIRECTORY) or job_row[COLUMN_JOB_DIRECTORY] + output_dir = Path(job_dir, output_dir) + success = output_dir.is_dir() and any(output_dir.iterdir()) + if success and files is not None and len(files): + for f in files: + success &= len(list(output_dir.glob(f))) > 0 + return success + + +def _get_script_extension(script_extension: str, executable: Path, mask: Path) -> str: + """ Returns the extension of the script to run based on + either the given value, its executable or the mask. """ + if script_extension is not None: + return script_extension + return SCRIPT_EXTENSIONS.get(executable, mask.suffix) diff --git a/pylhc_submitter/htc/mask.py b/pylhc_submitter/submitter/mask.py similarity index 60% rename from pylhc_submitter/htc/mask.py rename to pylhc_submitter/submitter/mask.py index 616e10f..3a2dcaa 100644 --- a/pylhc_submitter/htc/mask.py +++ b/pylhc_submitter/submitter/mask.py @@ -8,15 +8,17 @@ import logging import re from pathlib import Path +from typing import Iterable, List, Sequence, Set, Union import pandas as pd +from numpy.typing import ArrayLike -from pylhc_submitter.htc.utils import COLUMN_JOB_DIRECTORY, COLUMN_JOB_FILE +from pylhc_submitter.constants.job_submitter import COLUMN_JOB_DIRECTORY, COLUMN_JOB_FILE LOG = logging.getLogger(__name__) -def create_jobs_from_mask( +def create_job_scripts_from_mask( job_df: pd.DataFrame, maskfile: Path, replace_keys: dict, file_ext: str ) -> pd.DataFrame: """ @@ -44,18 +46,19 @@ def create_jobs_from_mask( for idx, (jobid, values) in enumerate(job_df.iterrows()): jobfile_fullpath = (Path(values[COLUMN_JOB_DIRECTORY]) / jobname).with_suffix(file_ext) - with jobfile_fullpath.open("w") as madxjob: - madxjob.write(template % dict(zip(replace_keys, values[list(replace_keys)]))) + with jobfile_fullpath.open("w") as job_file: + job_file.write(template % dict(zip(replace_keys, values[list(replace_keys)]))) jobs[idx] = jobfile_fullpath.name job_df[COLUMN_JOB_FILE] = jobs return job_df -def find_named_variables_in_mask(mask: str): +def find_named_variables_in_mask(mask: str) -> Set[str]: + """ Find all variable-names in the mask. """ return set(re.findall(r"%\((\w+)\)", mask)) -def check_percentage_signs_in_mask(mask: str): +def check_percentage_signs_in_mask(mask: str) -> None: """ Checks for '%' in the mask, that are not replacement variables. """ cleaned_mask = re.sub(r"%\((\w+)\)", "", mask) n_signs = cleaned_mask.count("%") @@ -70,14 +73,42 @@ def check_percentage_signs_in_mask(mask: str): raise KeyError(f"{n_signs} problematic '%' signs found in template. Please remove.") -def generate_jobdf_index(old_df, jobid_mask, keys, values): - """ Generates index for jobdf from mask for job_id naming. """ +def generate_jobdf_index(old_df: pd.DataFrame, jobid_mask: str, keys: Sequence[str], values: ArrayLike + ) -> Union[List[str], Iterable[int]]: + """ Generates index for jobdf from mask for job_id naming. + + Args: + old_df (pd.DataFrame): Existing jobdf. + jobid_mask (str): Mask for naming the jobs. + keys (Sequence[str]): Keys to be replaced in the mask. + values (np.array_like): Values-Grid to be replaced in the mask. + + Returns: + List[str]: Index for jobdf, either list of strings (the filled jobid_masks) or integer-range. + """ if not jobid_mask: + # Use integer-range as index, if no mask is given + # Start with last index if old_df is not None. nold = len(old_df.index) if old_df is not None else 0 start = nold-1 if nold > 0 else 0 return range(start, start + values.shape[0]) + + # Fill job-id mask return [jobid_mask % dict(zip(keys, v)) for v in values] +def is_mask_file(mask: str) -> bool: + """ Check if given string points to a file. """ + try: + return Path(mask).is_file() + except OSError: + return False + + +def is_mask_string(mask: str) -> bool: + """ Checks that given string does not point to a file. """ + return not is_mask_file(mask) + + if __name__ == "__main__": raise EnvironmentError(f"{__file__} is not supposed to run as main.") diff --git a/pylhc_submitter/submitter/runners.py b/pylhc_submitter/submitter/runners.py new file mode 100644 index 0000000..c215e07 --- /dev/null +++ b/pylhc_submitter/submitter/runners.py @@ -0,0 +1,121 @@ +""" +Job Submitter Runners +--------------------- + +Defines the methods to run the job-submitter, locally or on HTC. +""" +import logging +import multiprocessing +import subprocess +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Dict, Optional, Tuple + +import pandas as pd +import tfs + +from pylhc_submitter.constants.job_submitter import (COLUMN_DEST_DIRECTORY, COLUMN_JOB_DIRECTORY, + COLUMN_SHELL_SCRIPT) +from pylhc_submitter.submitter import htc_utils +from pylhc_submitter.submitter.iotools import is_eos_uri +from pylhc_submitter.utils.environment import on_windows + +LOG = logging.getLogger(__name__) + + +@dataclass +class RunnerOpts: + """ Options for running the submission. """ + working_directory: Path # Path to the working directory (e.g. afs) + jobflavour: Optional[str] = None # HTCondor job flavour (lengths of the job) + output_dir: Optional[str] = None # Name of the output directory, where jobs store data + ssh: Optional[str] = None # SSH command + dryrun: Optional[bool] = False # Perform only a dry-run, i.e. do all but submit to HTC + htc_arguments: Optional[Dict[str, Any]] = None # Arguments to pass on to htc as keywords + run_local: Optional[bool] = False # Run jobs locally + num_processes: Optional[int] = 4 # Number of processes to run in parallel (locally) + + +def run_jobs(job_df: tfs.TfsDataFrame, opt: RunnerOpts) -> None: + """Selects how to run the jobs. + + Args: + job_df (tfs.TfsDataFrame): DataFrame containing all the job-information + opt (RunnerOpts): Parameters for the runner + """ + if opt.run_local: + run_local(job_df, opt) + else: + run_htc(job_df, opt) + + +def run_local(job_df: tfs.TfsDataFrame, opt: RunnerOpts) -> None: + """Run all jobs locally. + + Args: + job_df (tfs.TfsDataFrame): DataFrame containing all the job-information + opt (RunnerOpts): Parameters for the runner + """ + if opt.dryrun: + LOG.info(f"Dry-run: Skipping local run.") + return + + LOG.info(f"Running {len(job_df.index)} jobs locally in {opt.num_processes:d} processes.") + + pool = multiprocessing.Pool(processes=opt.num_processes) + res = pool.map(_execute_shell, job_df.iterrows()) + if any(res): + jobs_failed = [j for r, j in zip(res, job_df.index) if r] + LOG.error(f"{len(jobs_failed)} of {len(job_df)} jobs have failed:\n {jobs_failed}") + raise RuntimeError("At least one job has failed. Check output logs!") + + +def run_htc(job_df: tfs.TfsDataFrame, opt: RunnerOpts) -> None: + """ Create submission file and submit the jobs to ``HTCondor``. + + Args: + job_df (tfs.TfsDataFrame): DataFrame containing all the job-information + opt (RunnerOpts): Parameters for the runner + """ + LOG.info(f"Submitting {len(job_df.index)} jobs on htcondor, flavour '{opt.jobflavour}'.") + LOG.debug("Creating htcondor subfile.") + + subfile = htc_utils.make_subfile( + opt.working_directory, job_df, + output_dir=opt.output_dir, + duration=opt.jobflavour, + **opt.htc_arguments + ) + + if opt.dryrun: + LOG.info("Dry run: submission file created, but not submitting jobs to htcondor.") + return + + LOG.debug("Submitting jobs to htcondor.") + htc_utils.submit_jobfile(subfile, opt.ssh) + + +# Helper ####################################################################### + +def _execute_shell(df_row: Tuple[Any, pd.Series]) -> int: + """ Execute the shell script. + + Args: + df_row (Tuple[Any, pd.Series]): Row in the job-dataframe as coming from `iterrows()`, + i.e. a tuple of (index, series) + + Returns: + int: return code of the process + """ + _, column = df_row + cmd = [] if on_windows() else ["sh"] + + with Path(column[COLUMN_JOB_DIRECTORY], "log.tmp").open("w") as logfile: + process = subprocess.Popen( + cmd + [column[COLUMN_SHELL_SCRIPT]], + shell=on_windows(), + stdout=logfile, + stderr=subprocess.STDOUT, + cwd=column[COLUMN_JOB_DIRECTORY], + ) + return process.wait() diff --git a/pylhc_submitter/utils/environment_tools.py b/pylhc_submitter/utils/environment.py similarity index 100% rename from pylhc_submitter/utils/environment_tools.py rename to pylhc_submitter/utils/environment.py diff --git a/pylhc_submitter/utils/iotools.py b/pylhc_submitter/utils/iotools.py index 261861e..31a50b5 100644 --- a/pylhc_submitter/utils/iotools.py +++ b/pylhc_submitter/utils/iotools.py @@ -4,8 +4,8 @@ Tools for input and output. """ -from pathlib import Path from datetime import datetime +from pathlib import Path from typing import Iterable from generic_parser.entry_datatypes import get_instance_faker_meta @@ -13,7 +13,6 @@ from pylhc_submitter.constants.general import TIME - # Output ----------------------------------------------------------------------- diff --git a/tests/unit/test_job_submitter.py b/tests/unit/test_job_submitter.py index 308202b..f395840 100644 --- a/tests/unit/test_job_submitter.py +++ b/tests/unit/test_job_submitter.py @@ -1,10 +1,14 @@ +import itertools +from dataclasses import asdict, dataclass, field from pathlib import Path +from typing import Any, Dict, List, Optional, Sequence, Union +import numpy as np import pytest -from generic_parser import DotDict from pylhc_submitter.job_submitter import main as job_submit -from pylhc_submitter.utils.environment_tools import on_linux, on_windows +from pylhc_submitter.submitter.iotools import get_server_from_uri, is_eos_uri, uri_to_path +from pylhc_submitter.utils.environment import on_linux, on_windows SUBFILE = "queuehtc.sub" @@ -19,162 +23,286 @@ @pytest.mark.parametrize("maskfile", [True, False]) def test_job_creation_and_localrun(tmp_path, maskfile): - args, setup = _create_setup(tmp_path, mask_file=maskfile) - setup.update(run_local=True) - job_submit(**setup) - _test_output(args) + """ Tests that the jobs are created and can be run locally + from mask-string and mask-file. """ + setup = InputParameters(working_directory=tmp_path, run_local=True) + setup.create_mask(as_file=maskfile) + job_submit(**asdict(setup)) + _test_output(setup) + + +def test_output_directory(tmp_path): + """ Tests that the output is copied to the output destination. + As a by product it also tests that the jobs are created and can be run locally. """ + setup = InputParameters( + working_directory=tmp_path, + run_local=True, + output_destination=tmp_path / "my_new_output" / "long_path", + ) + setup.create_mask() + job_submit(**asdict(setup)) + _test_output(setup) + + +def test_detects_wrong_uri(tmp_path): + """ Tests that wrong URI's are identified. """ + setup = InputParameters( + working_directory=tmp_path, + run_local=True, + output_destination="root:/eosuser.cern.ch/eos/my_new_output", + ) + setup.create_mask() + with pytest.raises(ValueError) as e: + job_submit(**asdict(setup)) + assert "EOS-URI" in str(e) @run_only_on_linux def test_job_creation_and_localrun_with_multiline_maskstring(tmp_path): + """ Tests that the jobs are created and can be run locally from a multiline mask-string. """ mask = "123\"\" \nsleep 0.1 \n/bin/bash -c \"echo \"%(PARAM1)s.%(PARAM2)s" - args, setup = _create_setup(tmp_path, mask_content=mask, mask_file=False) - setup.update(run_local=True) - job_submit(**setup) - _test_output(args) + setup = InputParameters(working_directory=tmp_path, run_local=True) + setup.create_mask(content=mask, as_file=False) + job_submit(**asdict(setup)) + _test_output(setup) @run_only_on_linux @pytest.mark.parametrize("maskfile", [True, False]) def test_job_creation_and_dryrun(tmp_path, maskfile): - args, setup = _create_setup(tmp_path, mask_file=maskfile) - setup.update(dryrun=True) - job_submit(**setup) + """ Tests that the jobs are created as dry-run from mask-file and from mask-string. """ + setup = InputParameters(working_directory=tmp_path, dryrun=True) + setup.create_mask(as_file=maskfile) + job_submit(**asdict(setup)) _test_subfile_content(setup) - _test_output(args, post_run=False) + _test_output(setup, post_run=False) @run_only_on_linux @pytest.mark.parametrize("maskfile", [True, False]) def test_find_errorneous_percentage_signs(tmp_path, maskfile): + """ Tests that a key-error is raised on a mask-string with percentage signs, + that are not part of the replacement parameters. """ mask = "%(PARAM1)s.%(PARAM2)d\nsome stuff # should be 5%\nsome % more % stuff." - args, setup = _create_setup(tmp_path, mask_content=mask, mask_file=maskfile) + setup = InputParameters(working_directory=tmp_path) + setup.create_mask(content=mask, as_file=maskfile) with pytest.raises(KeyError) as e: - job_submit(**setup) - assert "problematic '%'" in e.value.args[0] + job_submit(**asdict(setup)) + assert "problematic '%'" in str(e) @run_only_on_linux @pytest.mark.parametrize("maskfile", [True, False]) def test_missing_keys(tmp_path, maskfile): + """ Tests that a key-error is raised on a mask-string with missing keys in the replacement dict. """ mask = "%(PARAM1)s.%(PARAM2)s.%(PARAM3)s" - args, setup = _create_setup(tmp_path, mask_content=mask, mask_file=maskfile) + setup = InputParameters(working_directory=tmp_path) + setup.create_mask(content=mask, as_file=maskfile) with pytest.raises(KeyError) as e: - job_submit(**setup) - assert "PARAM3" in e.value.args[0] + job_submit(**asdict(setup)) + assert "PARAM3" in str(e) @run_if_not_linux -def test_not_on_linux(tmp_path): - args, setup = _create_setup(tmp_path) +def test_htcondor_bindings_not_found_on_nonlinux_os(tmp_path): + """ Test that an error is raised if htcondor bindings are not found. + If this tests fails, this might mean, that htcondor bindings are finally + available for the other platforms. """ + setup = InputParameters(working_directory=tmp_path) + setup.create_mask() with pytest.raises(EnvironmentError) as e: - job_submit(**setup) - assert "htcondor bindings" in e.value.args[0] + job_submit(**asdict(setup)) + assert "htcondor bindings" in str(e) + + +@pytest.mark.skipif(on_windows(), reason="Paths are not split on '/' on Windows.") +def test_eos_uri_manipulation_functions(): + """ Unit-test for the EOS-URI parsing. (OH LOOK! An actual unit test!)""" + server = "root://eosuser.cern.ch/" + path = "/eos/user/m/mmustermann/" + uri = f"{server}{path}" + assert is_eos_uri(uri) + assert not is_eos_uri(path) + assert uri_to_path(uri) == Path(path) + assert get_server_from_uri(uri) == server @run_only_on_linux @pytest.mark.cern_network -def test_htc_submit(): - """ This test is here for local testing only. You need to adapt the path - and delete the results afterwards manually (so you can check them before.""" - user = "jdilly" - path = Path("/", "afs", "cern.ch", "user", user[0], user, "htc_temp") +@pytest.mark.parametrize("destination", [True, False]) +@pytest.mark.parametrize("uri", [False, True]) +def test_htc_submit(destination: bool, uri: bool): + """ This test is here for manual testing. + It runs 3 scenarios and each submits 6 jobs to HTCondor. + This means you need to be in the cern-network on a machine with afs and eos access + and htcondor installed. + You need to adapt the path to your user-name and delete the results afterwards manually. + + Scenarios: + a) destination = False: Transfer output data back to afs + b) destination = True, uri = False: Copy output data to EOS (via eos path) + c) destination = True, uri = True: Copy output data to EOS (via eos uri) + + Run this test twice, manually changing `prerun` from "True" to "False" after the jobs are finished. + - `prerun = True`: create the folder structures and submit the jobs. + - `prerun = False`: check that the output data is present. + """ + # MANUAL THINGS TO CHANGE ############################################## + user = "mmustermann" # set your username + tmp_name = "htc_temp" # name for the temporary folder (will be created) + prerun = True + # prerun = False # switch here after jobs finished. + + # Uncomment to fix the kerberos ticket, in case htcondor doesn't find it. + # Do a `klist` in terminal and adapt the path. + # import os + # os.environ["KRB5CCNAME"] = "/tmp/krb5cc_####" + ######################################################################## + if uri and not destination: + return # only need to run one when destination is not set + + # set working_directory + if destination: + tmp_name = f"{tmp_name}_dest" + if uri: + tmp_name = f"{tmp_name}_uri" + + path = Path("/", "afs", "cern.ch", "user", user[0], user, tmp_name) path.mkdir(exist_ok=True) - args, setup = _create_setup(path) - - job_submit(**setup) - _test_subfile_content(setup) - _test_output(args, post_run=False) - # _test_output(args, post_run=True) # you can use this if you like after htcondor is done - -# Helper ----------------------------------------------------------------------- - - -def _create_setup(cwd_path: Path, mask_content: str = None, mask_file: bool = True): - """ Create a quick setup for Parameters PARAM1 and PARAM2. """ - out_name = "out.txt" - out_dir = "Outputdir" - - args = DotDict( - cwd=cwd_path, - out_name=out_name, - out_dir=out_dir, - id="%(PARAM1)s.%(PARAM2)d", - mask_name="test_script.mask", - ext=".bat" if on_windows() else ".sh", - out_file=Path(out_dir, out_name), - p1_list=["a", "b"], - p2_list=[1, 2, 3], - mask_file=mask_file - ) - - mask_string = _make_executable_string(args, mask_content) - if args.mask_file: - mask_path = args.cwd / args.mask_name - with mask_path.open("w") as f: - f.write(mask_string) - - setup = dict( - executable=None if on_windows() else "/bin/bash", - script_extension=args.ext, - job_output_dir=out_dir, - mask=str(mask_path) if args.mask_file else mask_string, - replace_dict=dict(PARAM1=args.p1_list, PARAM2=args.p2_list), - jobid_mask=args.id, - jobflavour="workday", - resume_jobs=True, - check_files=[args.out_name], - working_directory=str(args.cwd), - dryrun=False, - run_local=False, - htc_arguments={"max_retries": "4", "some_other_argument": "some_other_parameter"}, + # set output_destination + dest = None + if destination: + dest = f"/eos/user/{user[0]}/{user}/{tmp_name}" + if uri: + dest = f"root://eosuser.cern.ch/{dest}" + + # create setup + setup = InputParameters( + working_directory=path, + output_destination=dest, + # dryrun=True ) - return args, setup + setup.create_mask() - -def _make_executable_string(args, mask_content): - if mask_content is None: - mask_content = args.id - - if on_windows(): - mask_string = f'echo {mask_content}> "{args.out_file}"' + if prerun: + # submit jobs + job_submit(**asdict(setup)) + _test_subfile_content(setup) + _test_output(setup, post_run=False) else: - mask_string = f'echo "{mask_content}" > "{args.out_file}"' - if not args.mask_file: - mask_string = " ".join(['-c "', mask_string, '"']) - return f"{mask_string}\n" + # check output + _test_output(setup, post_run=True) -def _test_subfile_content(setup): - subfile = Path(setup['working_directory']) / SUBFILE +# Helper ----------------------------------------------------------------------- + +@dataclass +class InputParameters: + """ job_submitter input parameters. """ + working_directory: Path + executable: Optional[str] = None if on_windows() else "/bin/bash" + script_extension: Optional[str] =".bat" if on_windows() else ".sh" + job_output_dir: Optional[str] = "Outputdir" + jobid_mask: Optional[str] = "%(PARAM1)s.%(PARAM2)d" + replace_dict: Optional[Dict] = field(default_factory=lambda: dict(PARAM1=["a", "b"], PARAM2=[1, 2, 3])) + jobflavour: Optional[str] = "workday" + resume_jobs: Optional[bool] = True + check_files: Optional[Sequence] = field(default_factory=lambda: ["out.txt",]) + dryrun: Optional[bool] = False + run_local: Optional[bool] = False + htc_arguments: Optional[Dict] = field(default_factory=lambda: {"max_retries": "4", "some_other_argument": "some_other_parameter"}) + output_destination: Optional[Path] = None + mask: Union[Path, str] = None # will be set in create_mask + + def create_mask(self, name: str = "test_script.mask", content: str = None, as_file: bool = False): + output_file = Path(self.job_output_dir, self.check_files[0]) + + if content is None: + content = self.jobid_mask + + if on_windows(): + mask_string = f'echo {content}> "{output_file!s}"' + else: + mask_string = f'echo "{content}" > "{output_file!s}"' + if not as_file: + mask_string = " ".join(['-c "', mask_string, '"']) + + mask_string = f"{mask_string}\n" + + if as_file: + mask_path = self.working_directory / name + with mask_path.open("w") as f: + f.write(mask_string) + self.mask = mask_path + else: + self.mask = mask_string + + +def _test_subfile_content(setup: InputParameters): + """ Checks some of the content of the subfile (queuehtc.sub). """ + subfile = setup.working_directory / SUBFILE assert subfile.exists() with subfile.open("r") as sfile: filecontents = dict(line.rstrip().split(" = ") for line in sfile if " = " in line) - assert filecontents["MY.JobFlavour"].strip('"') == setup["jobflavour"] # flavour is saved with "" in .sub, and read in with them - assert filecontents["transfer_output_files"] == setup["job_output_dir"] - for key in setup["htc_arguments"].keys(): - assert filecontents[key] == setup["htc_arguments"][key] - - -def _test_output(args, post_run=True): - for p1 in args.p1_list: - for p2 in args.p2_list: - current_id = args.id % dict(PARAM1=p1, PARAM2=p2) - job_name = f"Job.{current_id}" - job_dir_path = args.cwd / job_name - out_dir_path = job_dir_path / args.out_dir - out_file_path = out_dir_path / args.out_name - - assert job_dir_path.exists() - assert job_dir_path.is_dir() - if args.mask_file: - assert (job_dir_path / args.mask_name).with_suffix(args.ext).exists() - # assert out_dir_path.exists() # does not seem to be pre-created anymore (jdilly 2021-05-04) - if post_run: - assert out_dir_path.is_dir() - assert out_file_path.exists() - assert out_file_path.is_file() - - with out_file_path.open("r") as f: - assert f.read().strip("\n") == current_id + assert filecontents["MY.JobFlavour"].strip('"') == setup.jobflavour # flavour is saved with "" in .sub, and read in with them + if setup.output_destination is None: + assert filecontents["transfer_output_files"] == setup.job_output_dir + else: + assert "transfer_output_files" not in filecontents + + for key in setup.htc_arguments.keys(): + assert filecontents[key] == setup.htc_arguments[key] + + +def _test_output(setup: InputParameters, post_run: bool = True): + """ Checks the validity of the output. """ + + combinations = _generate_combinations(setup.replace_dict) + assert len(combinations) + assert len(combinations) == np.prod([len(v) for v in setup.replace_dict.values()]) + + for combination_instance in combinations: + current_id = setup.jobid_mask % combination_instance + job_name = f"Job.{current_id}" + + if isinstance(setup.mask, Path): + assert (setup.working_directory / job_name / setup.mask.name).with_suffix(setup.script_extension).exists() + + def _check_output_content(dir_path: Path, check_output: bool = True): + # Check if the code created the folder structure --- + job_path = uri_to_path(dir_path) / job_name + + assert job_path.exists() + assert job_path.is_dir() + + if check_output: # Check if the jobs created the files --- + out_dir_path = job_path / setup.job_output_dir + out_file_path = out_dir_path / setup.check_files[0] + + assert out_dir_path.is_dir() + assert out_file_path.exists() + assert out_file_path.is_file() + + with out_file_path.open("r") as f: + assert f.read().strip("\n") == current_id + + # Check local working directory --- + _check_output_content(setup.working_directory, check_output=post_run and setup.output_destination is None) + + if setup.output_destination is not None: + # Check copy at output destination --- + _check_output_content(setup.output_destination, check_output=post_run) + + +def _generate_combinations(data: Dict[str, Sequence]) -> List[Dict[str, Any]]: + """ Creates all possible combinations of values in data as a list of dictionaries. """ + keys = list(data.keys()) + all_values = [data[key] for key in keys] + + combinations = [ + {keys[i]: values[i] for i in range(len(keys))} + for values in itertools.product(*all_values) + ] + + return combinations