diff --git a/.gitignore b/.gitignore
index 713d8a6..5e133cd 100644
--- a/.gitignore
+++ b/.gitignore
@@ -243,9 +243,14 @@ Temporary Items
# Neovim
.nvimlog
+# Intellij
/.idea/codeStyles/codeStyleConfig.xml
/.idea/misc.xml
/.idea/modules.xml
/.idea/inspectionProfiles/profiles_settings.xml
/.idea/vcs.xml
/.idea/PyhDToolkit.iml
+
+# Other
+tst_*
+
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6c0c717..75e8f64 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,18 @@
# `pylhc-submitter` Changelog
+## Version 2.0.0
+
+- General code cleanup/refactoring/documentation:
+ - Partly breaks backward compatibility, if individual methods of the `job_submitter`-functionality have been used.
+ - Does not affect any setups simply calling the `main()` function of `job_submitter.py` or calling the `job_submitter` as a module.
+ - Apart from some fixed imports, following the new structure, the `autosix` module has been untouched.
+
+
+- New Feature of `job_submitter`:
+ - `output_destination` input parameter, which sets an output directory in which the folder-stucture
+ for the jobs will be replicated and the job's `job_output_dir` will be copied into "manually" at the end of the job,
+ instead of having the directory transferred back to the `working directory` by htcondor.
+
## Version 1.1.1
- Uses `concat` instead of `append` to stack the DataFrames.
diff --git a/doc/Makefile b/doc/Makefile
index 08f8749..ff66ee4 100644
--- a/doc/Makefile
+++ b/doc/Makefile
@@ -48,9 +48,9 @@ html:
@echo "Build finished. The HTML pages are in $(BUILDDIR)/html."
josch:
- $(SPHINXBUILD) -b html $(ALLSPHINXOPTS) /home/jdilly/Software/Documentation/submitter-doc
+ $(SPHINXBUILD) -b html $(ALLSPHINXOPTS) ../../Documentation/submitter-doc
@echo
- @echo "Build finished. The HTML pages are in /home/jdilly/Software/Documentation/submitter-doc."
+ @echo "Build finished. The HTML pages are in ../../Documentation/submitter-doc."
dirhtml:
$(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml
diff --git a/doc/_static/css/custom.css b/doc/_static/css/custom.css
index f201296..eea8824 100644
--- a/doc/_static/css/custom.css
+++ b/doc/_static/css/custom.css
@@ -1,7 +1,41 @@
+:root {
+ --nav-side-width: 300px; /* default is 300px */
+ /* for 100% width */
+ /*--nav-content-width: 100%;*/
+ /*--local-toc-width: 300px;*/
+ /*--nav-content-width-wide: calc(100% - var(--local-toc-width)); /* 100% here is fullscreen */
+ /*--local-toc-left: calc(100% - var(--local-toc-width)); /* 100% here is w/o sidebar */
+
+ /* for fixed widths */
+ --nav-content-width: 800px; /* default is 800px */
+ --nav-content-width-wide: var(--nav-content-width);
+ --local-toc-width: calc(100% - var(--nav-content-width-wide));
+ --local-toc-left: calc(var(--nav-content-width-wide) + var(--nav-side-width));
+}
+
+/* main content width */
+.wy-nav-content {
+ max-width: var(--nav-content-width);
+}
+
+/* Sidebar width */
+.wy-nav-side {
+ width: var(--nav-side-width);
+}
+
.wy-side-nav-search {
background: rgb(243,244,247);
}
+.wy-side-nav-search > a {
+ color: black;
+}
+
+.wy-side-nav-search> a img.logo {
+ width: 50%;
+}
+
+
.wy-side-nav-search > div.version {
color: black;
}
@@ -182,3 +216,60 @@ em.sig-param span.default_value {
.rst-content table.field-list th {
padding: 16px;
}
+
+
+/* Create local table of contents
+ ------------------------------
+ inspired by https://github.com/readthedocs/sphinx_rtd_theme/pull/919
+ and https://github.com/readthedocs/sphinx_rtd_theme/issues/764
+ see also _templates/layout.html
+ */
+
+#local-table-of-contents {
+ padding-bottom: 20px;
+ /* display: none; */
+}
+
+/* Mask entry of main header (chapter) */
+#local-table-of-contents a[href="#"]{
+ /*display: none;*/
+}
+
+/* indent subsections */
+#local-table-of-contents ul > ul {
+ padding-left: 0px;
+ margin-left: 20px;
+ padding-right: 0;
+ padding-bottom: 5px;
+}
+
+
+#local-table-of-contents-title {
+ margin-bottom: 10px;
+}
+
+/* Show in Sidebar if window width is larger than nav-side + nav-content + toc-width */
+@media screen and (min-width: 1200px) {
+ .wy-nav-content {
+ max-width: var(--nav-content-width-wide);
+ }
+
+ #local-table-of-contents {
+ display: block;
+ position: fixed;
+ margin-left: 15px;
+ overflow-y: auto;
+ height: 95%;
+ top: 45px;
+ left: var(--local-toc-left);
+ width: var(--local-toc-width);
+ }
+
+ #local-table-of-contents-title {
+ display: block;
+ font-size: 16px;
+ width: 100%;
+ padding-top: 10px;
+ padding-bottom: 5px;
+ }
+}
\ No newline at end of file
diff --git a/doc/_templates/layout.html b/doc/_templates/layout.html
new file mode 100644
index 0000000..aa67d6d
--- /dev/null
+++ b/doc/_templates/layout.html
@@ -0,0 +1,12 @@
+{% extends "!layout.html" %}
+{% block document %}
+ {%- if toc|length > title|length + 75 %}
+
+ {%- endif %}
+
+ {{ super() }}
+{% endblock %}
+
diff --git a/doc/conf.py b/doc/conf.py
index 6fbff20..283ad0a 100644
--- a/doc/conf.py
+++ b/doc/conf.py
@@ -66,9 +66,11 @@ def about_package(init_posixpath: pathlib.Path) -> dict:
"sphinx.ext.githubpages",
"sphinx.ext.napoleon",
]
+autosectionlabel_prefix_document = True
+autosectionlabel_maxdepth = 2
# Add any paths that contain templates here, relative to this directory.
-# templates_path = ['_templates']
+templates_path = ['_templates']
# The suffix(es) of source filenames.
# You can specify multiple suffix as a list of string:
@@ -84,6 +86,11 @@ def about_package(init_posixpath: pathlib.Path) -> dict:
copyright_ = "2019, pyLHC/OMC-TEAM"
author = ABOUT_PYLHC_SUBMITTER["__author__"]
+# Override link in 'Edit on Github'
+rst_prolog = f"""
+:github_url: {ABOUT_PYLHC_SUBMITTER['__url__']}
+"""
+
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
@@ -98,7 +105,7 @@ def about_package(init_posixpath: pathlib.Path) -> dict:
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
-language = None
+language = 'en'
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
diff --git a/doc/entrypoints/autosix.rst b/doc/entrypoints/autosix.rst
index 470e586..0276dfc 100644
--- a/doc/entrypoints/autosix.rst
+++ b/doc/entrypoints/autosix.rst
@@ -1,2 +1,3 @@
.. automodule:: pylhc_submitter.autosix
:members:
+ :noindex:
diff --git a/doc/entrypoints/job_submitter.rst b/doc/entrypoints/job_submitter.rst
index 7854e56..675569c 100644
--- a/doc/entrypoints/job_submitter.rst
+++ b/doc/entrypoints/job_submitter.rst
@@ -1,2 +1,3 @@
.. automodule:: pylhc_submitter.job_submitter
:members:
+ :noindex:
diff --git a/doc/modules/constants.rst b/doc/modules/constants.rst
index a0a2e36..8474b25 100644
--- a/doc/modules/constants.rst
+++ b/doc/modules/constants.rst
@@ -1,15 +1,25 @@
Constants Definitions
-**************************
+*********************
.. automodule:: pylhc_submitter.constants.general
:members:
+ :noindex:
.. automodule:: pylhc_submitter.constants.external_paths
:members:
+ :noindex:
+.. automodule:: pylhc_submitter.constants.job_submitter
+ :members:
+ :noindex:
+
+.. automodule:: pylhc_submitter.constants.htcondor
+ :members:
+ :noindex:
.. automodule:: pylhc_submitter.constants.autosix
:members:
+ :noindex:
diff --git a/doc/modules/htc.rst b/doc/modules/htc.rst
deleted file mode 100644
index f4965b0..0000000
--- a/doc/modules/htc.rst
+++ /dev/null
@@ -1,9 +0,0 @@
-HTCondor Tools
-**************************
-
-.. automodule:: pylhc_submitter.htc.utils
- :members:
-
-
-.. automodule:: pylhc_submitter.htc.mask
- :members:
diff --git a/doc/modules/sixdesk_tools.rst b/doc/modules/sixdesk_tools.rst
index 072ad58..0271a84 100644
--- a/doc/modules/sixdesk_tools.rst
+++ b/doc/modules/sixdesk_tools.rst
@@ -3,21 +3,28 @@ Sixdesk Tools
.. automodule:: pylhc_submitter.sixdesk_tools.stages
:members:
+ :noindex:
.. automodule:: pylhc_submitter.sixdesk_tools.create_workspace
:members:
+ :noindex:
.. automodule:: pylhc_submitter.sixdesk_tools.submit
:members:
+ :noindex:
.. automodule:: pylhc_submitter.sixdesk_tools.post_process_da
:members:
+ :noindex:
.. automodule:: pylhc_submitter.sixdesk_tools.extract_data_from_db
:members:
+ :noindex:
.. automodule:: pylhc_submitter.sixdesk_tools.utils
:members:
+ :noindex:
.. automodule:: pylhc_submitter.sixdesk_tools.troubleshooting
:members:
+ :noindex:
diff --git a/doc/modules/submitter.rst b/doc/modules/submitter.rst
new file mode 100644
index 0000000..ac54d63
--- /dev/null
+++ b/doc/modules/submitter.rst
@@ -0,0 +1,19 @@
+Submitter
+*********
+
+.. automodule:: pylhc_submitter.submitter.htc_utils
+ :members:
+ :noindex:
+
+
+.. automodule:: pylhc_submitter.submitter.iotools
+ :members:
+ :noindex:
+
+.. automodule:: pylhc_submitter.submitter.mask
+ :members:
+ :noindex:
+
+.. automodule:: pylhc_submitter.submitter.runners
+ :members:
+ :noindex:
diff --git a/doc/modules/utils.rst b/doc/modules/utils.rst
index aa7ecb0..107fb06 100644
--- a/doc/modules/utils.rst
+++ b/doc/modules/utils.rst
@@ -3,7 +3,9 @@ Utilities
.. automodule:: pylhc_submitter.utils.iotools
:members:
+ :noindex:
.. automodule:: pylhc_submitter.utils.logging_tools
:members:
+ :noindex:
diff --git a/pylhc_submitter/__init__.py b/pylhc_submitter/__init__.py
index 62de331..ed21d9e 100644
--- a/pylhc_submitter/__init__.py
+++ b/pylhc_submitter/__init__.py
@@ -10,7 +10,7 @@
__title__ = "pylhc_submitter"
__description__ = "pylhc-submitter contains scripts to simplify the creation and submission of jobs to HTCondor at CERN"
__url__ = "https://github.com/pylhc/submitter"
-__version__ = "1.1.1"
+__version__ = "2.0.0"
__author__ = "pylhc"
__author_email__ = "pylhc@github.com"
__license__ = "MIT"
diff --git a/pylhc_submitter/autosix.py b/pylhc_submitter/autosix.py
index f1f908c..09b2903 100644
--- a/pylhc_submitter/autosix.py
+++ b/pylhc_submitter/autosix.py
@@ -193,34 +193,17 @@
import numpy as np
import tfs
-from generic_parser import EntryPointParameters, entrypoint, DotDict
+from generic_parser import EntryPointParameters, entrypoint
from generic_parser.entry_datatypes import DictAsString
-from pylhc_submitter.constants.autosix import (
- HEADER_BASEDIR,
- SIXENV_REQUIRED,
- SIXENV_OPTIONAL,
- AutoSixEnvironment,
-)
-from pylhc_submitter.htc.mask import generate_jobdf_index
-from pylhc_submitter.job_submitter import (
- JOBSUMMARY_FILE,
- COLUMN_JOBID,
-)
-from pylhc_submitter.sixdesk_tools.create_workspace import (
- set_max_materialize
-)
-from pylhc_submitter.sixdesk_tools.stages import Stage, STAGE_ORDER
-from pylhc_submitter.sixdesk_tools.utils import (
- is_locked,
- check_mask,
-)
-from pylhc_submitter.utils.iotools import (
- PathOrStr,
- save_config,
- make_replace_entries_iterable,
- keys_to_path
-)
+from pylhc_submitter.constants.autosix import (HEADER_BASEDIR, SIXENV_OPTIONAL, SIXENV_REQUIRED,
+ AutoSixEnvironment)
+from pylhc_submitter.constants.job_submitter import COLUMN_JOBID, JOBSUMMARY_FILE
+from pylhc_submitter.submitter.mask import generate_jobdf_index
+from pylhc_submitter.sixdesk_tools.stages import STAGE_ORDER, Stage
+from pylhc_submitter.sixdesk_tools.utils import check_mask, is_locked
+from pylhc_submitter.utils.iotools import (PathOrStr, keys_to_path, make_replace_entries_iterable,
+ save_config)
from pylhc_submitter.utils.logging_tools import log_setup
LOG = logging.getLogger(__name__)
diff --git a/pylhc_submitter/constants/autosix.py b/pylhc_submitter/constants/autosix.py
index 39d0945..b91065d 100644
--- a/pylhc_submitter/constants/autosix.py
+++ b/pylhc_submitter/constants/autosix.py
@@ -1,6 +1,6 @@
"""
-Constants: Autosix
-----------------------------------
+Autosix
+-------
Collections of constants and paths used in autosix.
diff --git a/pylhc_submitter/constants/external_paths.py b/pylhc_submitter/constants/external_paths.py
index d470cbf..2e7e9e2 100644
--- a/pylhc_submitter/constants/external_paths.py
+++ b/pylhc_submitter/constants/external_paths.py
@@ -1,6 +1,6 @@
"""
-Constants: External Paths
--------------------------
+External Paths
+--------------
Specific constants relating to external paths to be used,
to help with consistency.
diff --git a/pylhc_submitter/constants/general.py b/pylhc_submitter/constants/general.py
index 59f796d..2c68d93 100644
--- a/pylhc_submitter/constants/general.py
+++ b/pylhc_submitter/constants/general.py
@@ -1,6 +1,6 @@
"""
-Constants: General
-------------------
+General
+-------
General constants to help with consistency.
"""
diff --git a/pylhc_submitter/constants/htcondor.py b/pylhc_submitter/constants/htcondor.py
new file mode 100644
index 0000000..8e9ac24
--- /dev/null
+++ b/pylhc_submitter/constants/htcondor.py
@@ -0,0 +1,24 @@
+"""
+HTCondor
+--------
+
+Constants for the HTCondor parameters.
+"""
+SHEBANG = "#!/bin/bash"
+SUBFILE = "queuehtc.sub"
+BASH_FILENAME = "Job"
+
+HTCONDOR_JOBLIMIT = 100000
+
+CMD_SUBMIT = "condor_submit"
+JOBFLAVOURS = (
+ "espresso", # 20 min
+ "microcentury", # 1 h
+ "longlunch", # 2 h
+ "workday", # 8 h
+ "tomorrow", # 1 d
+ "testmatch", # 3 d
+ "nextweek", # 1 w
+)
+
+NOTIFICATIONS = ("always", "complete", "error", "never")
\ No newline at end of file
diff --git a/pylhc_submitter/constants/job_submitter.py b/pylhc_submitter/constants/job_submitter.py
new file mode 100644
index 0000000..93c1236
--- /dev/null
+++ b/pylhc_submitter/constants/job_submitter.py
@@ -0,0 +1,33 @@
+
+"""
+Job Submitter
+-------------
+
+Collections of constants and paths used in the job-submitter.
+"""
+from pylhc_submitter.constants.external_paths import MADX_BIN, PYTHON2_BIN, PYTHON3_BIN
+
+JOBSUMMARY_FILE = "Jobs.tfs"
+JOBDIRECTORY_PREFIX = "Job"
+CONFIG_FILE = "config.ini"
+
+SCRIPT_EXTENSIONS = {
+ "madx": ".madx",
+ "python3": ".py",
+ "python2": ".py",
+}
+
+EXECUTEABLEPATH = {
+ "madx": MADX_BIN,
+ "python3": PYTHON3_BIN,
+ "python2": PYTHON2_BIN,
+}
+
+
+COLUMN_JOBID = "JobId"
+COLUMN_SHELL_SCRIPT = "ShellScript"
+COLUMN_JOB_DIRECTORY = "JobDirectory"
+COLUMN_DEST_DIRECTORY = "DestDirectory"
+COLUMN_JOB_FILE = "JobFile"
+
+NON_PARAMETER_COLUMNS = (COLUMN_SHELL_SCRIPT, COLUMN_JOB_DIRECTORY, COLUMN_JOB_FILE, COLUMN_DEST_DIRECTORY)
\ No newline at end of file
diff --git a/pylhc_submitter/job_submitter.py b/pylhc_submitter/job_submitter.py
index dda9aa3..6efe9b8 100644
--- a/pylhc_submitter/job_submitter.py
+++ b/pylhc_submitter/job_submitter.py
@@ -14,124 +14,173 @@
and job directory for further post processing.
For additional information and guides, see the `Job Submitter page
-`_ in the ``OMC`` documentation site.
+`_ in the ``OMC`` documentation site.
+
*--Required--*
-- **mask** *(str)*: Program mask to use
+- **mask** *(PathOrStr)*:
+
+ Program mask to use
+
+
+- **replace_dict** *(DictAsString)*:
+
+ Dict containing the str to replace as keys and values a list of
+ parameters to replace
+
-- **replace_dict** *(DictAsString)*: Dict containing the str to replace as
- keys and values a list of parameters to replace
+- **working_directory** *(PathOrStr)*:
-- **working_directory** *(str)*: Directory where data should be put
+ Directory where data should be put
*--Optional--*
-- **append_jobs**: Flag to rerun job with finer/wider grid,
- already existing points will not be reexecuted.
+- **append_jobs**:
+
+ Flag to rerun job with finer/wider grid, already existing points will
+ not be reexecuted.
+
+ action: ``store_true``
+
+
+- **check_files** *(str)*:
+
+ List of files/file-name-masks expected to be in the 'job_output_dir'
+ after a successful job (for appending/resuming). Uses the 'glob'
+ function, so unix-wildcards (*) are allowed. If not given, only the
+ presence of the folder itself is checked.
+
+
+- **dryrun**:
+
+ Flag to only prepare folders and scripts, but does not start/submit
+ jobs. Together with `resume_jobs` this can be use to check which jobs
+ succeeded and which failed.
+
+ action: ``store_true``
+
+
+- **executable** *(PathOrStr)*:
+
+ Path to executable or job-type (of ['madx', 'python3', 'python2']) to
+ use.
+
+ default: ``madx``
+
+
+- **htc_arguments** *(DictAsString)*:
+
+ Additional arguments for htcondor, as Dict-String. For AccountingGroup
+ please use 'accounting_group'. 'max_retries' and 'notification' have
+ defaults (if not given). Others are just passed on.
+
+ default: ``{}``
+
+
+- **job_output_dir** *(str)*:
+
+ The name of the output dir of the job. (Make sure your script puts its
+ data there!)
+
+ default: ``Outputdata``
+
+
+- **jobflavour** *(str)*:
+
+ Jobflavour to give rough estimate of runtime of one job
+
+ choices: ``('espresso', 'microcentury', 'longlunch', 'workday', 'tomorrow', 'testmatch', 'nextweek')``
+
+ default: ``workday``
+
+
+- **jobid_mask** *(str)*:
+
+ Mask to name jobs from replace_dict
+
+
+- **num_processes** *(int)*:
+
+ Number of processes to be used if run locally
+
+ default: ``4``
+
+
+- **output_destination** *(PathOrStr)*:
+
+ Directory to copy the output of the jobs to, sorted into folders per job.
+ Can be on EOS, preferrably via EOS-URI format ('root://eosuser.cern.ch//eos/...').
+
+
+- **resume_jobs**:
+
+ Only do jobs that did not work.
+
+ action: ``store_true``
- Action: ``store_true``
-- **check_files** *(str)*: List of files/file-name-masks expected to be in the
- 'job_output_dir' after a successful job (for appending/resuming). Uses the 'glob'
- function, so unix-wildcards (*) are allowed. If not given, only the presence of the folder itself is checked.
-- **dryrun**: Flag to only prepare folders and scripts,
- but does not start/submit jobs.
- Together with `resume_jobs` this can be use to check which jobs succeeded and which failed.
- Action: ``store_true``
-- **executable** *(str)*: Path to executable or job-type (of ['madx', 'python3', 'python2']) to use.
+- **run_local**:
-- **htc_arguments** *(DictAsString)*: Additional arguments for htcondor, as Dict-String.
- For AccountingGroup please use 'accounting_group'. 'max_retries' and 'notification' have defaults (if not given).
- Others are just passed on.
+ Flag to run the jobs on the local machine. Not suggested.
- Default: ``{}``
-- **job_output_dir** *(str)*: The name of the output dir of the job. (Make sure your script puts its data there!)
+ action: ``store_true``
- Default: ``Outputdata``
-- **jobflavour** *(str)*: Jobflavour to give rough estimate of runtime of one job
- Choices: ``('espresso', 'microcentury', 'longlunch', 'workday', 'tomorrow', 'testmatch', 'nextweek')``
- Default: ``workday``
-- **jobid_mask** *(str)*: Mask to name jobs from replace_dict
+- **script_arguments** *(DictAsString)*:
-- **num_processes** *(int)*: Number of processes to be used if run locally
+ Additional arguments to pass to the script, as dict in key-value pairs
+ ('--' need to be included in the keys).
- Default: ``4``
-- **resume_jobs**: Only do jobs that did not work.
+ default: ``{}``
- Action: ``store_true``
-- **run_local**: Flag to run the jobs on the local machine. Not suggested.
- Action: ``store_true``
-- **script_arguments** *(DictAsString)*: Additional arguments to pass to the script,
- as dict in key-value pairs ('--' need to be included in the keys).
+- **script_extension** *(str)*:
- Default: ``{}``
-- **script_extension** *(str)*: New extension for the scripts created from the masks.
- This is inferred automatically for ['madx', 'python3', 'python2']. Otherwise not changed.
+ New extension for the scripts created from the masks. This is inferred
+ automatically for ['madx', 'python3', 'python2']. Otherwise not
+ changed.
-- **ssh** *(str)*: Run htcondor from this machine via ssh (needs access to the `working_directory`)
+
+- **ssh** *(str)*:
+
+ Run htcondor from this machine via ssh (needs access to the
+ `working_directory`)
-:author: mihofer, jdilly, fesoubel
"""
-import itertools
import logging
-import multiprocessing
-import subprocess
import sys
+from dataclasses import fields
from pathlib import Path
-import numpy as np
-import tfs
from generic_parser import EntryPointParameters, entrypoint
from generic_parser.entry_datatypes import DictAsString
from generic_parser.tools import print_dict_tree
-import pylhc_submitter.htc.utils as htcutils
-from pylhc_submitter.htc.mask import (
- check_percentage_signs_in_mask,
- create_jobs_from_mask,
- find_named_variables_in_mask,
- generate_jobdf_index,
-)
-from pylhc_submitter.htc.utils import (
- COLUMN_JOB_DIRECTORY,
- COLUMN_SHELL_SCRIPT,
- EXECUTEABLEPATH,
- HTCONDOR_JOBLIMIT,
- JOBFLAVOURS,
-)
-from pylhc_submitter.utils.environment_tools import on_windows
-from pylhc_submitter.utils.iotools import PathOrStr, save_config, make_replace_entries_iterable, keys_to_path
+from pylhc_submitter.constants.htcondor import JOBFLAVOURS
+from pylhc_submitter.constants.job_submitter import EXECUTEABLEPATH, SCRIPT_EXTENSIONS
+from pylhc_submitter.submitter.iotools import CreationOpts, create_jobs, is_eos_uri, print_stats
+from pylhc_submitter.submitter.mask import (check_percentage_signs_in_mask,
+ find_named_variables_in_mask, is_mask_file)
+from pylhc_submitter.submitter.runners import RunnerOpts, run_jobs
+from pylhc_submitter.utils.iotools import (PathOrStr, keys_to_path, make_replace_entries_iterable,
+ save_config)
from pylhc_submitter.utils.logging_tools import log_setup
-JOBSUMMARY_FILE = "Jobs.tfs"
-JOBDIRECTORY_PREFIX = "Job"
-COLUMN_JOBID = "JobId"
-CONFIG_FILE = "config.ini"
-
-SCRIPT_EXTENSIONS = {
- "madx": ".madx",
- "python3": ".py",
- "python2": ".py",
-}
-
LOG = logging.getLogger(__name__)
try:
import htcondor
- HAS_HTCONDOR = True
except ImportError:
platform = "macOS" if sys.platform == "darwin" else "windows"
LOG.warning(
f"htcondor python bindings are linux-only. You can still use job_submitter on {platform}, "
"but only for local runs."
)
- HAS_HTCONDOR = False
+ htcondor = None
def get_params():
@@ -245,6 +294,12 @@ def get_params():
type=str,
default="Outputdata",
)
+ params.add_parameter(
+ name="output_destination",
+ help="Directory to copy the output of the jobs to, sorted into folders per job. "
+ "Can be on EOS, preferrably via EOS-URI format ('root://eosuser.cern.ch//eos/...').",
+ type=PathOrStr,
+ )
params.add_parameter(
name="htc_arguments",
help=(
@@ -273,214 +328,18 @@ def main(opt):
else:
LOG.info("Starting Job-submitter.")
- opt = _check_opts(opt)
- save_config(opt.working_directory, opt, "job_submitter")
-
- job_df = _create_jobs(
- opt.working_directory,
- opt.mask,
- opt.jobid_mask,
- opt.replace_dict,
- opt.job_output_dir,
- opt.append_jobs,
- opt.executable,
- opt.script_arguments,
- opt.script_extension,
- )
- job_df, dropped_jobs = _drop_already_ran_jobs(
- job_df, opt.resume_jobs or opt.append_jobs, opt.job_output_dir, opt.check_files
- )
+ save_config(Path(opt.working_directory), opt, "job_submitter")
+ creation_opt, runner_opt = check_opts(opt)
- if opt.run_local and not opt.dryrun:
- _run_local(job_df, opt.num_processes)
- else:
- _run_htc(
- job_df,
- opt.working_directory,
- opt.job_output_dir,
- opt.jobflavour,
- opt.ssh,
- opt.dryrun,
- opt.htc_arguments,
- )
- if opt.dryrun:
- _print_stats(job_df.index, dropped_jobs)
-
-
-# Main Functions ---------------------------------------------------------------
-
-
-def _create_jobs(
- cwd,
- mask_path_or_string,
- jobid_mask,
- replace_dict,
- output_dir,
- append_jobs,
- executable,
- script_args,
- script_extension,
-) -> tfs.TfsDataFrame:
- LOG.debug("Creating Jobs.")
- values_grid = np.array(list(itertools.product(*replace_dict.values())), dtype=object)
-
- if append_jobs:
- jobfile_path = cwd / JOBSUMMARY_FILE
- try:
- job_df = tfs.read(str(jobfile_path.absolute()), index=COLUMN_JOBID)
- except FileNotFoundError as filerror:
- raise FileNotFoundError(
- "Cannot append jobs, as no previous jobfile was found at " f"'{jobfile_path}'"
- ) from filerror
- mask = [elem not in job_df[replace_dict.keys()].values for elem in values_grid]
- njobs = mask.count(True)
- values_grid = values_grid[mask]
- else:
- njobs = len(values_grid)
- job_df = tfs.TfsDataFrame()
-
- if njobs == 0:
- raise ValueError(f"No (new) jobs found!")
- if njobs > HTCONDOR_JOBLIMIT:
- LOG.warning(
- f"You are attempting to submit an important number of jobs ({njobs})."
- "This can be a high stress on your system, make sure you know what you are doing."
- )
-
- LOG.debug(f"Initial number of jobs: {njobs:d}")
- data_df = tfs.TfsDataFrame(
- index=generate_jobdf_index(job_df, jobid_mask, replace_dict.keys(), values_grid),
- columns=list(replace_dict.keys()),
- data=values_grid,
- )
- job_df = tfs.concat([job_df, data_df], sort=False, how_headers='left')
- job_df = _setup_folders(job_df, cwd)
-
- if htcutils.is_mask_file(mask_path_or_string):
- LOG.debug("Creating all jobs from mask.")
- script_extension = _get_script_extension(script_extension, executable, mask_path_or_string)
- job_df = create_jobs_from_mask(
- job_df, mask_path_or_string, replace_dict.keys(), script_extension
- )
-
- LOG.debug("Creating shell scripts for submission.")
- job_df = htcutils.write_bash(
- job_df,
- output_dir,
- executable=executable,
- cmdline_arguments=script_args,
- mask=mask_path_or_string,
- )
-
- job_df[COLUMN_JOB_DIRECTORY] = job_df[COLUMN_JOB_DIRECTORY].apply(str)
- tfs.write(str(cwd / JOBSUMMARY_FILE), job_df, save_index=COLUMN_JOBID)
- return job_df
-
-
-def _drop_already_ran_jobs(
- job_df: tfs.TfsDataFrame, drop_jobs: bool, output_dir: str, check_files: str
-):
- LOG.debug("Dropping already finished jobs, if necessary.")
- finished_jobs = []
- if drop_jobs:
- finished_jobs = [
- idx
- for idx, row in job_df.iterrows()
- if _job_was_successful(row, output_dir, check_files)
- ]
- LOG.info(
- f"{len(finished_jobs):d} of {len(job_df.index):d}"
- " Jobs have already finished and will be skipped."
- )
- job_df = job_df.drop(index=finished_jobs)
- return job_df, finished_jobs
-
-
-def _run_local(job_df: tfs.TfsDataFrame, num_processes: int) -> None:
- LOG.info(f"Running {len(job_df.index)} jobs locally in {num_processes:d} processes.")
- pool = multiprocessing.Pool(processes=num_processes)
- res = pool.map(_execute_shell, job_df.iterrows())
- if any(res):
- LOG.error("At least one job has failed.")
- raise RuntimeError("At least one job has failed. Check output logs!")
-
-
-def _run_htc(
- job_df: tfs.TfsDataFrame,
- cwd: str,
- output_dir: str,
- flavour: str,
- ssh: str,
- dryrun: bool,
- additional_htc_arguments: DictAsString,
-) -> None:
- LOG.info(f"Submitting {len(job_df.index)} jobs on htcondor, flavour '{flavour}'.")
- LOG.debug("Creating htcondor subfile.")
- subfile = htcutils.make_subfile(
- cwd, job_df, output_dir=output_dir, duration=flavour, **additional_htc_arguments
- )
- if not dryrun:
- LOG.debug("Submitting jobs to htcondor.")
- htcutils.submit_jobfile(subfile, ssh)
+ job_df, dropped_jobs = create_jobs(creation_opt)
+ run_jobs(job_df, runner_opt)
-def _get_script_extension(script_extension: str, executable: PathOrStr, mask: PathOrStr) -> str:
- if script_extension is not None:
- return script_extension
- return SCRIPT_EXTENSIONS.get(executable, mask.suffix)
+ print_stats(job_df.index, dropped_jobs)
-# Sub Functions ----------------------------------------------------------------
-
-
-def _check_htcondor_presence() -> None:
- """Checks the ``HAS_HTCONDOR`` variable and raises EnvironmentError if it is ``False``."""
- if not HAS_HTCONDOR:
- raise EnvironmentError("htcondor bindings are necessary to run this module.")
-
-
-def _setup_folders(job_df: tfs.TfsDataFrame, working_directory: PathOrStr) -> tfs.TfsDataFrame:
- def _return_job_dir(job_id):
- return working_directory / f"{JOBDIRECTORY_PREFIX}.{job_id}"
-
- LOG.debug("Setting up folders: ")
- job_df[COLUMN_JOB_DIRECTORY] = [_return_job_dir(id_) for id_ in job_df.index]
-
- for job_dir in job_df[COLUMN_JOB_DIRECTORY]:
- try:
- job_dir.mkdir()
- except IOError:
- LOG.debug(f" failed '{job_dir}' (might already exist).")
- else:
- LOG.debug(f" created '{job_dir}'.")
- return job_df
-
-
-def _job_was_successful(job_row, output_dir, files) -> bool:
- output_dir = Path(job_row[COLUMN_JOB_DIRECTORY], output_dir)
- success = output_dir.is_dir() and any(output_dir.iterdir())
- if success and files is not None and len(files):
- for f in files:
- success &= len(list(output_dir.glob(f))) > 0
- return success
-
-
-def _execute_shell(df_row) -> int:
- idx, column = df_row
- cmd = [] if on_windows() else ["sh"]
-
- with Path(column[COLUMN_JOB_DIRECTORY], "log.tmp").open("w") as logfile:
- process = subprocess.Popen(
- cmd + [column[COLUMN_SHELL_SCRIPT]],
- shell=on_windows(),
- stdout=logfile,
- stderr=subprocess.STDOUT,
- cwd=column[COLUMN_JOB_DIRECTORY],
- )
- return process.wait()
-
-
-def _check_opts(opt):
+def check_opts(opt):
+ """ Checks options and sorts them into job-creation and running parameters. """
LOG.debug("Checking options.")
if opt.resume_jobs and opt.append_jobs:
raise ValueError("Select either Resume jobs or Append jobs")
@@ -491,15 +350,21 @@ def _check_opts(opt):
if str(opt.executable) in EXECUTEABLEPATH.keys():
opt.executable = str(opt.executable)
- if htcutils.is_mask_file(opt.mask):
- mask = Path(opt.mask).read_text() # checks that mask and dir are there
- opt["mask"] = Path(opt["mask"])
+ if is_mask_file(opt.mask):
+ mask_content = Path(opt.mask).read_text() # checks that mask and dir are there
+ opt.mask = Path(opt.mask)
else:
- mask = opt.mask
+ mask_content = opt.mask
+
+ if is_eos_uri(opt.output_destination) and not ("://" in opt.output_destination and "//eos" in opt.output_destination):
+ raise ValueError(
+ "The 'output_destination' is an EOS-URI but missing '://' or '//eos' (double slashes?). "
+ )
+
# Replace dict ---
dict_keys = set(opt.replace_dict.keys())
- mask_keys = find_named_variables_in_mask(mask)
+ mask_keys = find_named_variables_in_mask(mask_content)
not_in_mask = dict_keys - mask_keys
not_in_dict = mask_keys - dict_keys
@@ -519,25 +384,24 @@ def _check_opts(opt):
[opt.replace_dict.pop(key) for key in not_in_mask]
if len(opt.replace_dict) == 0:
raise KeyError("Empty replace-dictionary")
- check_percentage_signs_in_mask(mask)
+ check_percentage_signs_in_mask(mask_content)
print_dict_tree(opt, name="Input parameter", print_fun=LOG.debug)
opt.replace_dict = make_replace_entries_iterable(opt.replace_dict)
- return opt
-
-
-def _print_stats(new_jobs, finished_jobs):
- """Print some quick statistics."""
- LOG.info("------------- QUICK STATS ----------------")
- LOG.info(f"Jobs total:{len(new_jobs) + len(finished_jobs):d}")
- LOG.info(f"Jobs to run: {len(new_jobs):d}")
- LOG.info(f"Jobs already finished: {len(finished_jobs):d}")
- LOG.info("---------- JOBS TO RUN: NAMES -------------")
- for job_name in new_jobs:
- LOG.info(job_name)
- LOG.info("--------- JOBS FINISHED: NAMES ------------")
- for job_name in finished_jobs:
- LOG.info(job_name)
+
+ # Create new classes
+ opt.output_dir = opt.job_output_dir # renaming
+
+ creation = CreationOpts(**{f.name: opt[f.name] for f in fields(CreationOpts)})
+ runner = RunnerOpts(**{f.name: opt[f.name] for f in fields(RunnerOpts)})
+ runner.output_dir = None if opt.output_destination else opt.output_dir
+ return creation, runner
+
+
+def _check_htcondor_presence() -> None:
+ """ Raises an error if htcondor is not installed. """
+ if htcondor is None:
+ raise EnvironmentError("htcondor bindings are necessary to run this module.")
# Script Mode ------------------------------------------------------------------
diff --git a/pylhc_submitter/sixdesk_tools/utils.py b/pylhc_submitter/sixdesk_tools/utils.py
index 2c629fb..d287eb0 100644
--- a/pylhc_submitter/sixdesk_tools/utils.py
+++ b/pylhc_submitter/sixdesk_tools/utils.py
@@ -10,7 +10,7 @@
from pylhc_submitter.constants.autosix import SIXDESKLOCKFILE, get_workspace_path
from pylhc_submitter.constants.external_paths import SIXDESK_UTILS
-from pylhc_submitter.htc.mask import find_named_variables_in_mask
+from pylhc_submitter.submitter.mask import find_named_variables_in_mask
LOG = logging.getLogger(__name__)
diff --git a/pylhc_submitter/htc/__init__.py b/pylhc_submitter/submitter/__init__.py
similarity index 100%
rename from pylhc_submitter/htc/__init__.py
rename to pylhc_submitter/submitter/__init__.py
diff --git a/pylhc_submitter/htc/utils.py b/pylhc_submitter/submitter/htc_utils.py
similarity index 51%
rename from pylhc_submitter/htc/utils.py
rename to pylhc_submitter/submitter/htc_utils.py
index 80f2bc2..873fd23 100644
--- a/pylhc_submitter/htc/utils.py
+++ b/pylhc_submitter/submitter/htc_utils.py
@@ -15,68 +15,58 @@
import logging
import subprocess
from pathlib import Path
-from typing import Union
+from typing import Any, Dict, List, Union
from pandas import DataFrame
-from pylhc_submitter.utils.environment_tools import on_windows
+from pylhc_submitter.constants.htcondor import (BASH_FILENAME, CMD_SUBMIT, HTCONDOR_JOBLIMIT,
+ JOBFLAVOURS, NOTIFICATIONS, SHEBANG, SUBFILE)
+from pylhc_submitter.constants.job_submitter import (COLUMN_DEST_DIRECTORY, COLUMN_JOB_DIRECTORY,
+ COLUMN_JOB_FILE, COLUMN_SHELL_SCRIPT,
+ EXECUTEABLEPATH, NON_PARAMETER_COLUMNS)
+from pylhc_submitter.submitter import iotools
+from pylhc_submitter.submitter.mask import is_mask_file
+from pylhc_submitter.utils.environment import on_windows
try:
import htcondor
except ImportError: # will be handled by job_submitter
- pass
+ class htcondor:
+ """Dummy HTCondor module. To satisfy the typing. """
+ Submit: Any = None
-from pylhc_submitter.constants.external_paths import MADX_BIN, PYTHON2_BIN, PYTHON3_BIN
LOG = logging.getLogger(__name__)
-SHEBANG = "#!/bin/bash"
-SUBFILE = "queuehtc.sub"
-BASH_FILENAME = "Job"
-
-HTCONDOR_JOBLIMIT = 100000
-
-EXECUTEABLEPATH = {
- "madx": MADX_BIN,
- "python3": PYTHON3_BIN,
- "python2": PYTHON2_BIN,
-}
-
-
-CMD_SUBMIT = "condor_submit"
-JOBFLAVOURS = (
- "espresso", # 20 min
- "microcentury", # 1 h
- "longlunch", # 2 h
- "workday", # 8 h
- "tomorrow", # 1 d
- "testmatch", # 3 d
- "nextweek", # 1 w
-)
-
-NOTIFICATIONS = ("always", "complete", "error", "never")
-
-
-COLUMN_SHELL_SCRIPT = "ShellScript"
-COLUMN_JOB_DIRECTORY = "JobDirectory"
-COLUMN_JOB_FILE = "JobFile"
-
-
# Subprocess Methods ###########################################################
-def create_subfile_from_job(cwd: Path, job: str):
- """Write file to submit to ``HTCondor``."""
+def create_subfile_from_job(cwd: Path, submission: Union[str, htcondor.Submit]) -> Path:
+ """
+ Write file to submit to ``HTCondor``.
+
+ Args:
+ cwd (Path): working directory
+ submission (str, htcondor.Submit): HTCondor submission definition (i.e. content of the file)
+
+ Returns:
+ Path: path to sub-file
+ """
subfile = cwd / SUBFILE
LOG.debug(f"Writing sub-file '{str(subfile)}'.")
with subfile.open("w") as f:
- f.write(str(job))
+ f.write(str(submission))
return subfile
-def submit_jobfile(jobfile: Path, ssh: str):
- """Submit subfile to ``HTCondor`` via subprocess."""
+def submit_jobfile(jobfile: Path, ssh: str) -> None:
+ """Submit subfile to ``HTCondor`` via subprocess.
+
+ Args:
+ jobfile (Path): path to sub-file
+ ssh (str): ssh target
+ """
proc_args = [CMD_SUBMIT, jobfile]
if ssh:
proc_args = ["ssh", ssh] + proc_args
@@ -87,7 +77,15 @@ def submit_jobfile(jobfile: Path, ssh: str):
LOG.info("Jobs successfully submitted.")
-def _start_subprocess(command):
+def _start_subprocess(command: List[str]) -> int:
+ """ Start subprocess and log output.
+
+ Args:
+ command (List[str]): command to execute
+
+ Returns:
+ int: return code of the process
+ """
LOG.debug(f"Executing command '{command}'")
process = subprocess.Popen(
command, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
@@ -102,9 +100,10 @@ def _start_subprocess(command):
# Job Creation #################################################################
-def create_multijob_for_bashfiles(job_df: DataFrame, **kwargs):
+def create_multijob_for_bashfiles(job_df: DataFrame, **kwargs) -> str:
"""
- Function to create an ``HTCondor`` job assuming n_files bash-files.
+ Function to create an ``HTCondor`` submission content for all job-scripts,
+ i.e. bash-files, in the job_df.
Keyword Args:
output_dir (str): output directory that will be transferred. Defaults to ``None``.
@@ -114,7 +113,11 @@ def create_multijob_for_bashfiles(job_df: DataFrame, **kwargs):
retries (int): maximum amount of retries. Default to ``3``.
notification (str): Notify under certain conditions. Defaults to ``error``.
priority (int): Priority to order your jobs. Defaults to ``None``.
+
+ Returns:
+ str: HTCondor submission definition.
"""
+ # Pre-defined HTCondor arguments for our jobs
submit_dict = {
"MyId": "htcondor",
"universe": "vanilla",
@@ -125,9 +128,10 @@ def create_multijob_for_bashfiles(job_df: DataFrame, **kwargs):
"on_exit_remove": "(ExitBySignal == False) && (ExitCode == 0)",
"requirements": "Machine =!= LastRemoteHost",
}
- submit_dict.update(_map_kwargs(kwargs))
-
- job = htcondor.Submit(submit_dict)
+ submit_dict.update(map_kwargs(kwargs))
+
+ # Let the htcondor create the submit-file
+ submission = htcondor.Submit(submit_dict)
# add the multiple bash files
scripts = [
@@ -137,20 +141,27 @@ def create_multijob_for_bashfiles(job_df: DataFrame, **kwargs):
args = [",".join(parts) for parts in zip(scripts, job_df[COLUMN_JOB_DIRECTORY])]
queueArgs = ["queue executable, initialdir from (", *args, ")"]
- # ugly but job.setQArgs doesn't take string containing \n
- # job.setQArgs("\n".join(queueArgs))
- job = str(job) + "\n".join(queueArgs)
- LOG.debug(f"Created HTCondor subfile with content: \n{job}")
- return job
+ # ugly but submission.setQArgs doesn't take string containing '\n':
+ # submission.setQArgs("\n".join(queueArgs)) # doesn't work
+ submission = str(submission) + "\n".join(queueArgs)
+ LOG.debug(f"Created HTCondor subfile with content: \n{submission}")
+ return submission
# Main functions ###############################################################
-def make_subfile(cwd: Path, job_df: DataFrame, **kwargs):
+def make_subfile(cwd: Path, job_df: DataFrame, **kwargs) -> Path:
"""
Creates submit-file for ``HTCondor``.
For kwargs, see ``create_multijob_for_bashfiles``.
+
+ Args:
+ cwd (Path): working directory
+ job_df (DataFrame): DataFrame containing all the job-information
+
+ Returns:
+ Path: path to the submit-file
"""
job = create_multijob_for_bashfiles(job_df, **kwargs)
return create_subfile_from_job(cwd, job)
@@ -163,56 +174,89 @@ def write_bash(
cmdline_arguments: dict = None,
mask: Union[str, Path] = None,
) -> DataFrame:
- """Write the bash-files to be called by ``HTCondor``."""
+ """
+ Write the bash-files to be called by ``HTCondor``, which in turn call the executable.
+ Takes as input `Dataframe`, job type, and optional additional commandline arguments for the script.
+ A shell script is created in each job directory in the dataframe.
+
+ Args:
+ job_df (DataFrame): DataFrame containing all the job-information
+ output_dir (str): output directory that will be transferred. Defaults to ``None``.
+ executable (str): name of the executable. Defaults to ``madx``.
+ cmdline_arguments (dict): additional commandline arguments for the executable
+ mask (Union[str, Path]): string or path to the mask-file. Defaults to ``None``.
+
+ Returns:
+ DataFrame: The provided ``job_df`` but with added path to the scripts.
+ """
if len(job_df.index) > HTCONDOR_JOBLIMIT:
raise AttributeError("Submitting too many jobs for HTCONDOR")
- cmds = ""
- if cmdline_arguments is not None:
- cmds = f" {' '.join([f'{param} {val}' for param, val in cmdline_arguments.items()])}"
-
- if executable is None:
- exec_path = ''
- else:
- exec_path = f"{str(EXECUTEABLEPATH.get(executable, executable))} "
+ exec_path = f"{str(EXECUTEABLEPATH.get(executable, executable))} " if executable else ''
+ cmds = f" {' '.join([f'{param} {val}' for param, val in cmdline_arguments.items()])}" if cmdline_arguments else ''
shell_scripts = [None] * len(job_df.index)
for idx, (jobid, job) in enumerate(job_df.iterrows()):
job_dir = Path(job[COLUMN_JOB_DIRECTORY])
bash_file_name = f"{BASH_FILENAME}.{jobid}.{'bat' if on_windows() else 'sh'}"
jobfile = job_dir / bash_file_name
+
LOG.debug(f"Writing bash-file {idx:d} '{jobfile}'.")
with open(jobfile, "w") as f:
+ # Preparation ---
if not on_windows():
- f.write(f"{SHEBANG}\n")
+ f.write(f"{SHEBANG}\n")
+
if output_dir is not None:
f.write(f"mkdir {str(output_dir)}\n")
+
+ # The actual job execution ---
f.write(exec_path)
+ # Call the mask-file or the filled-template string
if is_mask_file(mask):
f.write(str(job_dir / job[COLUMN_JOB_FILE]))
else:
- replace_columns = [column for column in job.index.tolist() if column not in [COLUMN_SHELL_SCRIPT, COLUMN_JOB_DIRECTORY, COLUMN_JOB_FILE]]
+ replace_columns = [column for column in job.index.tolist() if column not in NON_PARAMETER_COLUMNS]
f.write(mask % dict(zip(replace_columns, job[replace_columns])))
+
+ # Additional commands for the mask/string
f.write(cmds)
f.write("\n")
+
+ # Manually copy output (if needed) ---
+ dest_dir = job.get(COLUMN_DEST_DIRECTORY)
+ if output_dir and dest_dir and output_dir != dest_dir:
+ if iotools.is_eos_uri(dest_dir):
+ # Note: eos-cp needs `/` at the end of both, source and target, dirs...
+ cp_command = f'eos cp -r {_str_ending_with_slash(output_dir)} {_str_ending_with_slash(dest_dir)}'
+ else:
+ # ...but '/' at the end of source dir copies only the content on macOS.
+ cp_command = f'cp -r {output_dir} {_str_ending_with_slash(dest_dir)}'
+
+ f.write(f'{cp_command}\n')
+
shell_scripts[idx] = bash_file_name
+
job_df[COLUMN_SHELL_SCRIPT] = shell_scripts
return job_df
-# Helper #######################################################################
+def map_kwargs(add_dict: Dict[str, Any]) -> Dict[str, Any]:
+ """
+ Maps the kwargs for the job-file.
+ Some arguments have pre-defined choices and defaults, the remaining ones are just passed on.
+ Args:
+ add_dict (Dict[str, Any]): additional kwargs to add to the defaults.
-def _map_kwargs(add_dict):
- """
- Maps the kwargs for the job-file. Some arguments have pre-defined choices and defaults,
- the remaining ones are just passed on.
+ Returns:
+ Dict[str, Any]: The mapped kwargs.
"""
new = {}
- # Predefined ones
- htc_map = {
+ # Predefined mappings
+ htc_map = { # name: mapped_name, choices, default
"duration": ("+JobFlavour", JOBFLAVOURS, "workday"),
"output_dir": ("transfer_output_files", None, None),
"accounting_group": ("+AccountingGroup", None, None),
@@ -223,14 +267,14 @@ def _map_kwargs(add_dict):
try:
value = add_dict.pop(key)
except KeyError:
- if default is not None:
- new[mapped] = default
+ value = default # could be `None`
else:
if choices is not None and value not in choices:
raise TypeError(
f"{key} needs to be one of '{str(choices).strip('[]')}' but "
f"instead was '{value}'"
)
+ if value is not None:
new[mapped] = _maybe_put_in_quotes(mapped, value)
# Pass-Through Arguments
@@ -239,20 +283,22 @@ def _map_kwargs(add_dict):
return new
-def _maybe_put_in_quotes(key, value):
+# Helper #######################################################################
+
+def _maybe_put_in_quotes(key: str, value: Any) -> Any:
+ """ Put value in quoted strings if key starts with '+' """
if key.startswith("+"):
return f'"{value}"'
return value
-def is_mask_file(mask):
- try:
- return Path(mask).is_file()
- except OSError:
- return False
+def _str_ending_with_slash(s: Union[Path, str]) -> str:
+ """ Add a slash at the end of a path if not present. """
+ s = str(s)
+ if s.endswith("/"):
+ return s
+ return f"{s}/"
-def is_mask_string(mask):
- return not is_mask_file(mask)
# Script Mode ##################################################################
diff --git a/pylhc_submitter/submitter/iotools.py b/pylhc_submitter/submitter/iotools.py
new file mode 100644
index 0000000..5c66c90
--- /dev/null
+++ b/pylhc_submitter/submitter/iotools.py
@@ -0,0 +1,318 @@
+"""
+Job Submitter IO-Tools
+----------------------
+
+Tools for input and output for the job-submitter.
+"""
+import itertools
+import logging
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Any, Dict, List, Sequence, Tuple, Union
+
+import numpy as np
+import pandas as pd
+import tfs
+
+from pylhc_submitter.constants.htcondor import HTCONDOR_JOBLIMIT
+from pylhc_submitter.constants.job_submitter import (COLUMN_DEST_DIRECTORY, COLUMN_JOB_DIRECTORY,
+ COLUMN_JOBID, JOBDIRECTORY_PREFIX,
+ JOBSUMMARY_FILE, SCRIPT_EXTENSIONS)
+from pylhc_submitter.submitter import htc_utils
+from pylhc_submitter.submitter.mask import (create_job_scripts_from_mask, generate_jobdf_index,
+ is_mask_file)
+
+LOG = logging.getLogger(__name__)
+
+
+@dataclass
+class CreationOpts:
+ """ Options for creating jobs. """
+ working_directory: Path # Path to working directory (afs)
+ mask: Union[Path, str] # Path to mask file or mask-string
+ jobid_mask: str # Mask for jobid
+ replace_dict: Dict[str, Any] # Replace-dict
+ output_dir: Path # Path to local output directory
+ output_destination: Union[Path, str] # Path or URI to remote output directory (e.g. eos)
+ append_jobs: bool # Append jobs to existing jobs
+ resume_jobs: bool # Resume jobs that have already run/failed/got interrupted
+ executable: str # Name of executable to call the script (from mask)
+ check_files: Sequence[str] # List of output files to check for success
+ script_arguments: Dict[str, Any] # Arguments to pass to script
+ script_extension: str # Extension of the script to run
+
+ def should_drop_jobs(self) -> bool:
+ """ Check if jobs should be dropped after creating the whole parameter space,
+ e.g. because they already exist. """
+ return self.append_jobs or self.resume_jobs
+
+
+
+def create_jobs(opt: CreationOpts) -> tfs.TfsDataFrame:
+ """Main function to prepare all the jobs and folder structure.
+ This greates the value-grid based on the replace-dict and
+ checks for existing jobs (if so desired).
+ A job-dataframe is created - and written out - containing all the information and
+ its values are used to generate the job-scripts.
+ It also creates bash-scripts to call the executable for the job-scripts.
+
+ Args:
+ opt (CreationOpts): Options for creating jobs
+
+ Returns:
+ tfs.TfsDataFrame: The job-dataframe containing information for all jobs.
+ """
+ LOG.debug("Creating Jobs.")
+
+ # Generate product of replace-dict and compare to existing jobs ---
+ parameters, values_grid, prev_job_df = _generate_parameter_space(
+ replace_dict=opt.replace_dict,
+ append_jobs=opt.append_jobs,
+ cwd=opt.working_directory,
+ )
+
+ # Check new jobs ---
+ njobs = len(values_grid)
+ if njobs == 0:
+ raise ValueError(f"No (new) jobs found!")
+
+ if njobs > HTCONDOR_JOBLIMIT:
+ LOG.warning(
+ f"You are attempting to submit an important number of jobs ({njobs})."
+ "This can be a high stress on your system, make sure you know what you are doing."
+ )
+
+ LOG.debug(f"Initial number of jobs: {njobs:d}")
+
+ # Generate new job-dataframe ---
+ job_df = tfs.TfsDataFrame(
+ index=generate_jobdf_index(prev_job_df, opt.jobid_mask, parameters, values_grid),
+ columns=parameters,
+ data=values_grid,
+ )
+ job_df = tfs.concat([prev_job_df, job_df], sort=False, how_headers='left')
+
+ # Setup folders ---
+ job_df = create_folders(job_df, opt.working_directory, opt.output_destination)
+
+ # Create scripts ---
+ if is_mask_file(opt.mask):
+ LOG.debug("Creating all jobs from mask.")
+ script_extension = _get_script_extension(opt.script_extension, opt.executable, opt.mask)
+ job_df = create_job_scripts_from_mask(
+ job_df, opt.mask, parameters, script_extension
+ )
+
+ LOG.debug("Creating shell scripts.")
+ job_df = htc_utils.write_bash(
+ job_df,
+ output_dir=opt.output_dir,
+ executable=opt.executable,
+ cmdline_arguments=opt.script_arguments,
+ mask=opt.mask,
+ )
+
+ # Convert paths to strings and write df to file ---
+ job_df[COLUMN_JOB_DIRECTORY] = job_df[COLUMN_JOB_DIRECTORY].apply(str)
+ if COLUMN_DEST_DIRECTORY in job_df.columns:
+ job_df[COLUMN_DEST_DIRECTORY] = job_df[COLUMN_DEST_DIRECTORY].apply(str)
+
+ tfs.write(str(opt.working_directory / JOBSUMMARY_FILE), job_df, save_index=COLUMN_JOBID)
+
+ # Drop already run jobs ---
+ dropped_jobs = []
+ if opt.should_drop_jobs():
+ job_df, dropped_jobs = _drop_already_run_jobs(
+ job_df, opt.output_dir, opt.check_files
+ )
+ return job_df, dropped_jobs
+
+
+def create_folders(job_df: tfs.TfsDataFrame, working_directory: Path,
+ destination_directory: Union[Path, str] = None) -> tfs.TfsDataFrame:
+ """Create the folder-structure in the given working directory and the
+ destination directory if given.
+ This creates a folder per job in which then the job-scripts and bash-scripts
+ can be stored later.
+
+ Args:
+ job_df (tfs.TfsDataFrame): DataFrame containing all the job-information
+ working_directory (Path): Path to the working directory
+ destination_directory (Path, optional): Path to the destination directory,
+ i.e. the directory to copy the outputs to manually. Defaults to None.
+
+ Returns:
+ tfs.TfsDataFrame: The job-dataframe again, but with the added paths to the job-dirs.
+ """
+ LOG.debug("Setting up folders: ")
+
+ jobname = f"{JOBDIRECTORY_PREFIX}.{{0}}"
+ job_df[COLUMN_JOB_DIRECTORY] = [working_directory / jobname.format(id_) for id_ in job_df.index]
+
+ for job_dir in job_df[COLUMN_JOB_DIRECTORY]:
+ job_dir.mkdir(exist_ok=True)
+ LOG.debug(f" created '{job_dir}'.")
+
+ if destination_directory:
+ dest_path = uri_to_path(destination_directory)
+ dest_path.mkdir(parents=True, exist_ok=True)
+
+ server = get_server_from_uri(destination_directory)
+ job_df[COLUMN_DEST_DIRECTORY] = [f"{server}{dest_path / jobname.format(id_)}" for id_ in job_df.index]
+
+ # Make some symlinks for easy navigation---
+ # Output directory -> Working Directory
+ sym_submission = dest_path / Path('SUBMISSION_DIR')
+ sym_submission.unlink(missing_ok=True)
+ sym_submission.symlink_to(working_directory.resolve(), target_is_directory=True)
+
+ # Working Directory -> Output Directory
+ sym_destination = working_directory / Path('OUTPUT_DIR')
+ sym_destination.unlink(missing_ok=True)
+ sym_destination.symlink_to(dest_path.resolve(), target_is_directory=True)
+
+ # Create output dirs per job ---
+ for job_dest_dir in job_df[COLUMN_DEST_DIRECTORY]:
+ uri_to_path(job_dest_dir).mkdir(exist_ok=True)
+ LOG.debug(f" created '{job_dest_dir}'.")
+
+ return job_df
+
+
+def is_eos_uri(path: Union[Path, str, None]) -> bool:
+ """ Check if the given path is an EOS-URI as `eos cp` only works with those.
+ E.g.: root://eosuser.cern.ch//eos/user/a/anabramo/banana.txt
+
+ This function does not check the double slashes,
+ to avoid having the user pass a malformed path by accident and then
+ assuming it is just a path. This is tested for in
+ :func:`pylhc_submitter.job_submitter.check_opts`.
+ """
+ if path is None:
+ return False
+
+ parts = Path(path).parts
+ return (
+ len(parts) >= 3 # at least root:, server, path
+ and
+ parts[0].endswith(':')
+ and
+ parts[2] == 'eos'
+ )
+
+
+def uri_to_path(path: Union[Path, str]) -> Path:
+ """ Strip EOS path information from a path.
+ EOS paths for HTCondor can be given as URI. Strip for direct writing.
+ E.g.: root://eosuser.cern.ch//eos/user/a/anabramo/banana.txt
+ """
+ path = Path(path)
+ parts = path.parts
+ if parts[0].endswith(':'):
+ # the first two parts are host info, e.g `file: //host/path`
+ return Path('/', *parts[2:])
+ return path
+
+
+def get_server_from_uri(path: Union[Path, str]) -> str:
+ """ Get server information from a path.
+ E.g.: root://eosuser.cern.ch//eos/user/a/ -> root://eosuser.cern.ch/
+ """
+ path_part = uri_to_path(path)
+ if path_part == Path(path):
+ return ""
+
+ server_part = str(path).replace(str(path_part), '')
+ if server_part.endswith("//"):
+ server_part = server_part[:-1]
+ return server_part
+
+
+def print_stats(new_jobs: Sequence[str], finished_jobs: Sequence[str]):
+ """Print some quick statistics."""
+ text = [
+ "\n------------- QUICK STATS ----------------"
+ f"Jobs total:{len(new_jobs) + len(finished_jobs):d}",
+ f"Jobs to run: {len(new_jobs):d}",
+ f"Jobs already finished: {len(finished_jobs):d}",
+ "---------- JOBS TO RUN: NAMES -------------"
+ ]
+ for job_name in new_jobs:
+ text.append(job_name)
+ text += ["--------- JOBS FINISHED: NAMES ------------"]
+ for job_name in finished_jobs:
+ text.append(job_name)
+ LOG.info("\n".join(text))
+
+
+def _generate_parameter_space(
+ replace_dict: Dict[str, Any], append_jobs: bool, cwd: Path
+ ) -> Tuple[List[str], np.ndarray, tfs.TfsDataFrame]:
+ """ Generate parameter space from replace-dict, check for existing jobs. """
+ LOG.debug("Generating parameter space from replace-dict.")
+ parameters = list(replace_dict.keys())
+ values_grid = _generate_values_grid(replace_dict)
+ if not append_jobs:
+ return parameters, values_grid, tfs.TfsDataFrame()
+
+ jobfile_path = cwd / JOBSUMMARY_FILE
+ try:
+ prev_job_df = tfs.read(str(jobfile_path.absolute()), index=COLUMN_JOBID)
+ except FileNotFoundError as filerror:
+ raise FileNotFoundError(
+ "Cannot append jobs, as no previous jobfile was found at " f"'{jobfile_path}'"
+ ) from filerror
+ new_jobs_mask = [elem not in prev_job_df[parameters].values for elem in values_grid]
+ values_grid = values_grid[new_jobs_mask]
+
+ return parameters, values_grid, prev_job_df
+
+
+def _generate_values_grid(replace_dict: Dict[str, Any]) -> np.ndarray:
+ """ Creates an array of the inner-product of the replace-dict. """
+ return np.array(list(itertools.product(*replace_dict.values())), dtype=object)
+
+
+def _drop_already_run_jobs(
+ job_df: tfs.TfsDataFrame, output_dir: str, check_files: str
+ ) -> Tuple[tfs.TfsDataFrame, List[str]]:
+ """ Check for jobs that have already been run and drop them from current job_df. """
+ LOG.debug("Dropping already finished jobs.")
+ finished_jobs = [
+ idx
+ for idx, row in job_df.iterrows()
+ if _job_was_successful(row, output_dir, check_files)
+ ]
+
+ LOG.info(
+ f"{len(finished_jobs):d} of {len(job_df.index):d}"
+ " Jobs have already finished and will be skipped."
+ )
+
+ job_df = job_df.drop(index=finished_jobs)
+ return job_df, finished_jobs
+
+
+def _job_was_successful(job_row: pd.Series, output_dir: str, files: Sequence[str]) -> bool:
+ """ Determines if the job was successful.
+
+ Args:
+ job_row (pd.Series): row from the job_df
+ output_dir (str): Name of the (local) output directory
+ files (List[str]): list of files that should have been generated
+ """
+ job_dir = job_row.get(COLUMN_DEST_DIRECTORY) or job_row[COLUMN_JOB_DIRECTORY]
+ output_dir = Path(job_dir, output_dir)
+ success = output_dir.is_dir() and any(output_dir.iterdir())
+ if success and files is not None and len(files):
+ for f in files:
+ success &= len(list(output_dir.glob(f))) > 0
+ return success
+
+
+def _get_script_extension(script_extension: str, executable: Path, mask: Path) -> str:
+ """ Returns the extension of the script to run based on
+ either the given value, its executable or the mask. """
+ if script_extension is not None:
+ return script_extension
+ return SCRIPT_EXTENSIONS.get(executable, mask.suffix)
diff --git a/pylhc_submitter/htc/mask.py b/pylhc_submitter/submitter/mask.py
similarity index 60%
rename from pylhc_submitter/htc/mask.py
rename to pylhc_submitter/submitter/mask.py
index 616e10f..3a2dcaa 100644
--- a/pylhc_submitter/htc/mask.py
+++ b/pylhc_submitter/submitter/mask.py
@@ -8,15 +8,17 @@
import logging
import re
from pathlib import Path
+from typing import Iterable, List, Sequence, Set, Union
import pandas as pd
+from numpy.typing import ArrayLike
-from pylhc_submitter.htc.utils import COLUMN_JOB_DIRECTORY, COLUMN_JOB_FILE
+from pylhc_submitter.constants.job_submitter import COLUMN_JOB_DIRECTORY, COLUMN_JOB_FILE
LOG = logging.getLogger(__name__)
-def create_jobs_from_mask(
+def create_job_scripts_from_mask(
job_df: pd.DataFrame, maskfile: Path, replace_keys: dict, file_ext: str
) -> pd.DataFrame:
"""
@@ -44,18 +46,19 @@ def create_jobs_from_mask(
for idx, (jobid, values) in enumerate(job_df.iterrows()):
jobfile_fullpath = (Path(values[COLUMN_JOB_DIRECTORY]) / jobname).with_suffix(file_ext)
- with jobfile_fullpath.open("w") as madxjob:
- madxjob.write(template % dict(zip(replace_keys, values[list(replace_keys)])))
+ with jobfile_fullpath.open("w") as job_file:
+ job_file.write(template % dict(zip(replace_keys, values[list(replace_keys)])))
jobs[idx] = jobfile_fullpath.name
job_df[COLUMN_JOB_FILE] = jobs
return job_df
-def find_named_variables_in_mask(mask: str):
+def find_named_variables_in_mask(mask: str) -> Set[str]:
+ """ Find all variable-names in the mask. """
return set(re.findall(r"%\((\w+)\)", mask))
-def check_percentage_signs_in_mask(mask: str):
+def check_percentage_signs_in_mask(mask: str) -> None:
""" Checks for '%' in the mask, that are not replacement variables. """
cleaned_mask = re.sub(r"%\((\w+)\)", "", mask)
n_signs = cleaned_mask.count("%")
@@ -70,14 +73,42 @@ def check_percentage_signs_in_mask(mask: str):
raise KeyError(f"{n_signs} problematic '%' signs found in template. Please remove.")
-def generate_jobdf_index(old_df, jobid_mask, keys, values):
- """ Generates index for jobdf from mask for job_id naming. """
+def generate_jobdf_index(old_df: pd.DataFrame, jobid_mask: str, keys: Sequence[str], values: ArrayLike
+ ) -> Union[List[str], Iterable[int]]:
+ """ Generates index for jobdf from mask for job_id naming.
+
+ Args:
+ old_df (pd.DataFrame): Existing jobdf.
+ jobid_mask (str): Mask for naming the jobs.
+ keys (Sequence[str]): Keys to be replaced in the mask.
+ values (np.array_like): Values-Grid to be replaced in the mask.
+
+ Returns:
+ List[str]: Index for jobdf, either list of strings (the filled jobid_masks) or integer-range.
+ """
if not jobid_mask:
+ # Use integer-range as index, if no mask is given
+ # Start with last index if old_df is not None.
nold = len(old_df.index) if old_df is not None else 0
start = nold-1 if nold > 0 else 0
return range(start, start + values.shape[0])
+
+ # Fill job-id mask
return [jobid_mask % dict(zip(keys, v)) for v in values]
+def is_mask_file(mask: str) -> bool:
+ """ Check if given string points to a file. """
+ try:
+ return Path(mask).is_file()
+ except OSError:
+ return False
+
+
+def is_mask_string(mask: str) -> bool:
+ """ Checks that given string does not point to a file. """
+ return not is_mask_file(mask)
+
+
if __name__ == "__main__":
raise EnvironmentError(f"{__file__} is not supposed to run as main.")
diff --git a/pylhc_submitter/submitter/runners.py b/pylhc_submitter/submitter/runners.py
new file mode 100644
index 0000000..c215e07
--- /dev/null
+++ b/pylhc_submitter/submitter/runners.py
@@ -0,0 +1,121 @@
+"""
+Job Submitter Runners
+---------------------
+
+Defines the methods to run the job-submitter, locally or on HTC.
+"""
+import logging
+import multiprocessing
+import subprocess
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Any, Dict, Optional, Tuple
+
+import pandas as pd
+import tfs
+
+from pylhc_submitter.constants.job_submitter import (COLUMN_DEST_DIRECTORY, COLUMN_JOB_DIRECTORY,
+ COLUMN_SHELL_SCRIPT)
+from pylhc_submitter.submitter import htc_utils
+from pylhc_submitter.submitter.iotools import is_eos_uri
+from pylhc_submitter.utils.environment import on_windows
+
+LOG = logging.getLogger(__name__)
+
+
+@dataclass
+class RunnerOpts:
+ """ Options for running the submission. """
+ working_directory: Path # Path to the working directory (e.g. afs)
+ jobflavour: Optional[str] = None # HTCondor job flavour (lengths of the job)
+ output_dir: Optional[str] = None # Name of the output directory, where jobs store data
+ ssh: Optional[str] = None # SSH command
+ dryrun: Optional[bool] = False # Perform only a dry-run, i.e. do all but submit to HTC
+ htc_arguments: Optional[Dict[str, Any]] = None # Arguments to pass on to htc as keywords
+ run_local: Optional[bool] = False # Run jobs locally
+ num_processes: Optional[int] = 4 # Number of processes to run in parallel (locally)
+
+
+def run_jobs(job_df: tfs.TfsDataFrame, opt: RunnerOpts) -> None:
+ """Selects how to run the jobs.
+
+ Args:
+ job_df (tfs.TfsDataFrame): DataFrame containing all the job-information
+ opt (RunnerOpts): Parameters for the runner
+ """
+ if opt.run_local:
+ run_local(job_df, opt)
+ else:
+ run_htc(job_df, opt)
+
+
+def run_local(job_df: tfs.TfsDataFrame, opt: RunnerOpts) -> None:
+ """Run all jobs locally.
+
+ Args:
+ job_df (tfs.TfsDataFrame): DataFrame containing all the job-information
+ opt (RunnerOpts): Parameters for the runner
+ """
+ if opt.dryrun:
+ LOG.info(f"Dry-run: Skipping local run.")
+ return
+
+ LOG.info(f"Running {len(job_df.index)} jobs locally in {opt.num_processes:d} processes.")
+
+ pool = multiprocessing.Pool(processes=opt.num_processes)
+ res = pool.map(_execute_shell, job_df.iterrows())
+ if any(res):
+ jobs_failed = [j for r, j in zip(res, job_df.index) if r]
+ LOG.error(f"{len(jobs_failed)} of {len(job_df)} jobs have failed:\n {jobs_failed}")
+ raise RuntimeError("At least one job has failed. Check output logs!")
+
+
+def run_htc(job_df: tfs.TfsDataFrame, opt: RunnerOpts) -> None:
+ """ Create submission file and submit the jobs to ``HTCondor``.
+
+ Args:
+ job_df (tfs.TfsDataFrame): DataFrame containing all the job-information
+ opt (RunnerOpts): Parameters for the runner
+ """
+ LOG.info(f"Submitting {len(job_df.index)} jobs on htcondor, flavour '{opt.jobflavour}'.")
+ LOG.debug("Creating htcondor subfile.")
+
+ subfile = htc_utils.make_subfile(
+ opt.working_directory, job_df,
+ output_dir=opt.output_dir,
+ duration=opt.jobflavour,
+ **opt.htc_arguments
+ )
+
+ if opt.dryrun:
+ LOG.info("Dry run: submission file created, but not submitting jobs to htcondor.")
+ return
+
+ LOG.debug("Submitting jobs to htcondor.")
+ htc_utils.submit_jobfile(subfile, opt.ssh)
+
+
+# Helper #######################################################################
+
+def _execute_shell(df_row: Tuple[Any, pd.Series]) -> int:
+ """ Execute the shell script.
+
+ Args:
+ df_row (Tuple[Any, pd.Series]): Row in the job-dataframe as coming from `iterrows()`,
+ i.e. a tuple of (index, series)
+
+ Returns:
+ int: return code of the process
+ """
+ _, column = df_row
+ cmd = [] if on_windows() else ["sh"]
+
+ with Path(column[COLUMN_JOB_DIRECTORY], "log.tmp").open("w") as logfile:
+ process = subprocess.Popen(
+ cmd + [column[COLUMN_SHELL_SCRIPT]],
+ shell=on_windows(),
+ stdout=logfile,
+ stderr=subprocess.STDOUT,
+ cwd=column[COLUMN_JOB_DIRECTORY],
+ )
+ return process.wait()
diff --git a/pylhc_submitter/utils/environment_tools.py b/pylhc_submitter/utils/environment.py
similarity index 100%
rename from pylhc_submitter/utils/environment_tools.py
rename to pylhc_submitter/utils/environment.py
diff --git a/pylhc_submitter/utils/iotools.py b/pylhc_submitter/utils/iotools.py
index 261861e..31a50b5 100644
--- a/pylhc_submitter/utils/iotools.py
+++ b/pylhc_submitter/utils/iotools.py
@@ -4,8 +4,8 @@
Tools for input and output.
"""
-from pathlib import Path
from datetime import datetime
+from pathlib import Path
from typing import Iterable
from generic_parser.entry_datatypes import get_instance_faker_meta
@@ -13,7 +13,6 @@
from pylhc_submitter.constants.general import TIME
-
# Output -----------------------------------------------------------------------
diff --git a/tests/unit/test_job_submitter.py b/tests/unit/test_job_submitter.py
index 308202b..f395840 100644
--- a/tests/unit/test_job_submitter.py
+++ b/tests/unit/test_job_submitter.py
@@ -1,10 +1,14 @@
+import itertools
+from dataclasses import asdict, dataclass, field
from pathlib import Path
+from typing import Any, Dict, List, Optional, Sequence, Union
+import numpy as np
import pytest
-from generic_parser import DotDict
from pylhc_submitter.job_submitter import main as job_submit
-from pylhc_submitter.utils.environment_tools import on_linux, on_windows
+from pylhc_submitter.submitter.iotools import get_server_from_uri, is_eos_uri, uri_to_path
+from pylhc_submitter.utils.environment import on_linux, on_windows
SUBFILE = "queuehtc.sub"
@@ -19,162 +23,286 @@
@pytest.mark.parametrize("maskfile", [True, False])
def test_job_creation_and_localrun(tmp_path, maskfile):
- args, setup = _create_setup(tmp_path, mask_file=maskfile)
- setup.update(run_local=True)
- job_submit(**setup)
- _test_output(args)
+ """ Tests that the jobs are created and can be run locally
+ from mask-string and mask-file. """
+ setup = InputParameters(working_directory=tmp_path, run_local=True)
+ setup.create_mask(as_file=maskfile)
+ job_submit(**asdict(setup))
+ _test_output(setup)
+
+
+def test_output_directory(tmp_path):
+ """ Tests that the output is copied to the output destination.
+ As a by product it also tests that the jobs are created and can be run locally. """
+ setup = InputParameters(
+ working_directory=tmp_path,
+ run_local=True,
+ output_destination=tmp_path / "my_new_output" / "long_path",
+ )
+ setup.create_mask()
+ job_submit(**asdict(setup))
+ _test_output(setup)
+
+
+def test_detects_wrong_uri(tmp_path):
+ """ Tests that wrong URI's are identified. """
+ setup = InputParameters(
+ working_directory=tmp_path,
+ run_local=True,
+ output_destination="root:/eosuser.cern.ch/eos/my_new_output",
+ )
+ setup.create_mask()
+ with pytest.raises(ValueError) as e:
+ job_submit(**asdict(setup))
+ assert "EOS-URI" in str(e)
@run_only_on_linux
def test_job_creation_and_localrun_with_multiline_maskstring(tmp_path):
+ """ Tests that the jobs are created and can be run locally from a multiline mask-string. """
mask = "123\"\" \nsleep 0.1 \n/bin/bash -c \"echo \"%(PARAM1)s.%(PARAM2)s"
- args, setup = _create_setup(tmp_path, mask_content=mask, mask_file=False)
- setup.update(run_local=True)
- job_submit(**setup)
- _test_output(args)
+ setup = InputParameters(working_directory=tmp_path, run_local=True)
+ setup.create_mask(content=mask, as_file=False)
+ job_submit(**asdict(setup))
+ _test_output(setup)
@run_only_on_linux
@pytest.mark.parametrize("maskfile", [True, False])
def test_job_creation_and_dryrun(tmp_path, maskfile):
- args, setup = _create_setup(tmp_path, mask_file=maskfile)
- setup.update(dryrun=True)
- job_submit(**setup)
+ """ Tests that the jobs are created as dry-run from mask-file and from mask-string. """
+ setup = InputParameters(working_directory=tmp_path, dryrun=True)
+ setup.create_mask(as_file=maskfile)
+ job_submit(**asdict(setup))
_test_subfile_content(setup)
- _test_output(args, post_run=False)
+ _test_output(setup, post_run=False)
@run_only_on_linux
@pytest.mark.parametrize("maskfile", [True, False])
def test_find_errorneous_percentage_signs(tmp_path, maskfile):
+ """ Tests that a key-error is raised on a mask-string with percentage signs,
+ that are not part of the replacement parameters. """
mask = "%(PARAM1)s.%(PARAM2)d\nsome stuff # should be 5%\nsome % more % stuff."
- args, setup = _create_setup(tmp_path, mask_content=mask, mask_file=maskfile)
+ setup = InputParameters(working_directory=tmp_path)
+ setup.create_mask(content=mask, as_file=maskfile)
with pytest.raises(KeyError) as e:
- job_submit(**setup)
- assert "problematic '%'" in e.value.args[0]
+ job_submit(**asdict(setup))
+ assert "problematic '%'" in str(e)
@run_only_on_linux
@pytest.mark.parametrize("maskfile", [True, False])
def test_missing_keys(tmp_path, maskfile):
+ """ Tests that a key-error is raised on a mask-string with missing keys in the replacement dict. """
mask = "%(PARAM1)s.%(PARAM2)s.%(PARAM3)s"
- args, setup = _create_setup(tmp_path, mask_content=mask, mask_file=maskfile)
+ setup = InputParameters(working_directory=tmp_path)
+ setup.create_mask(content=mask, as_file=maskfile)
with pytest.raises(KeyError) as e:
- job_submit(**setup)
- assert "PARAM3" in e.value.args[0]
+ job_submit(**asdict(setup))
+ assert "PARAM3" in str(e)
@run_if_not_linux
-def test_not_on_linux(tmp_path):
- args, setup = _create_setup(tmp_path)
+def test_htcondor_bindings_not_found_on_nonlinux_os(tmp_path):
+ """ Test that an error is raised if htcondor bindings are not found.
+ If this tests fails, this might mean, that htcondor bindings are finally
+ available for the other platforms. """
+ setup = InputParameters(working_directory=tmp_path)
+ setup.create_mask()
with pytest.raises(EnvironmentError) as e:
- job_submit(**setup)
- assert "htcondor bindings" in e.value.args[0]
+ job_submit(**asdict(setup))
+ assert "htcondor bindings" in str(e)
+
+
+@pytest.mark.skipif(on_windows(), reason="Paths are not split on '/' on Windows.")
+def test_eos_uri_manipulation_functions():
+ """ Unit-test for the EOS-URI parsing. (OH LOOK! An actual unit test!)"""
+ server = "root://eosuser.cern.ch/"
+ path = "/eos/user/m/mmustermann/"
+ uri = f"{server}{path}"
+ assert is_eos_uri(uri)
+ assert not is_eos_uri(path)
+ assert uri_to_path(uri) == Path(path)
+ assert get_server_from_uri(uri) == server
@run_only_on_linux
@pytest.mark.cern_network
-def test_htc_submit():
- """ This test is here for local testing only. You need to adapt the path
- and delete the results afterwards manually (so you can check them before."""
- user = "jdilly"
- path = Path("/", "afs", "cern.ch", "user", user[0], user, "htc_temp")
+@pytest.mark.parametrize("destination", [True, False])
+@pytest.mark.parametrize("uri", [False, True])
+def test_htc_submit(destination: bool, uri: bool):
+ """ This test is here for manual testing.
+ It runs 3 scenarios and each submits 6 jobs to HTCondor.
+ This means you need to be in the cern-network on a machine with afs and eos access
+ and htcondor installed.
+ You need to adapt the path to your user-name and delete the results afterwards manually.
+
+ Scenarios:
+ a) destination = False: Transfer output data back to afs
+ b) destination = True, uri = False: Copy output data to EOS (via eos path)
+ c) destination = True, uri = True: Copy output data to EOS (via eos uri)
+
+ Run this test twice, manually changing `prerun` from "True" to "False" after the jobs are finished.
+ - `prerun = True`: create the folder structures and submit the jobs.
+ - `prerun = False`: check that the output data is present.
+ """
+ # MANUAL THINGS TO CHANGE ##############################################
+ user = "mmustermann" # set your username
+ tmp_name = "htc_temp" # name for the temporary folder (will be created)
+ prerun = True
+ # prerun = False # switch here after jobs finished.
+
+ # Uncomment to fix the kerberos ticket, in case htcondor doesn't find it.
+ # Do a `klist` in terminal and adapt the path.
+ # import os
+ # os.environ["KRB5CCNAME"] = "/tmp/krb5cc_####"
+ ########################################################################
+ if uri and not destination:
+ return # only need to run one when destination is not set
+
+ # set working_directory
+ if destination:
+ tmp_name = f"{tmp_name}_dest"
+ if uri:
+ tmp_name = f"{tmp_name}_uri"
+
+ path = Path("/", "afs", "cern.ch", "user", user[0], user, tmp_name)
path.mkdir(exist_ok=True)
- args, setup = _create_setup(path)
-
- job_submit(**setup)
- _test_subfile_content(setup)
- _test_output(args, post_run=False)
- # _test_output(args, post_run=True) # you can use this if you like after htcondor is done
-
-# Helper -----------------------------------------------------------------------
-
-
-def _create_setup(cwd_path: Path, mask_content: str = None, mask_file: bool = True):
- """ Create a quick setup for Parameters PARAM1 and PARAM2. """
- out_name = "out.txt"
- out_dir = "Outputdir"
-
- args = DotDict(
- cwd=cwd_path,
- out_name=out_name,
- out_dir=out_dir,
- id="%(PARAM1)s.%(PARAM2)d",
- mask_name="test_script.mask",
- ext=".bat" if on_windows() else ".sh",
- out_file=Path(out_dir, out_name),
- p1_list=["a", "b"],
- p2_list=[1, 2, 3],
- mask_file=mask_file
- )
-
- mask_string = _make_executable_string(args, mask_content)
- if args.mask_file:
- mask_path = args.cwd / args.mask_name
- with mask_path.open("w") as f:
- f.write(mask_string)
-
- setup = dict(
- executable=None if on_windows() else "/bin/bash",
- script_extension=args.ext,
- job_output_dir=out_dir,
- mask=str(mask_path) if args.mask_file else mask_string,
- replace_dict=dict(PARAM1=args.p1_list, PARAM2=args.p2_list),
- jobid_mask=args.id,
- jobflavour="workday",
- resume_jobs=True,
- check_files=[args.out_name],
- working_directory=str(args.cwd),
- dryrun=False,
- run_local=False,
- htc_arguments={"max_retries": "4", "some_other_argument": "some_other_parameter"},
+ # set output_destination
+ dest = None
+ if destination:
+ dest = f"/eos/user/{user[0]}/{user}/{tmp_name}"
+ if uri:
+ dest = f"root://eosuser.cern.ch/{dest}"
+
+ # create setup
+ setup = InputParameters(
+ working_directory=path,
+ output_destination=dest,
+ # dryrun=True
)
- return args, setup
+ setup.create_mask()
-
-def _make_executable_string(args, mask_content):
- if mask_content is None:
- mask_content = args.id
-
- if on_windows():
- mask_string = f'echo {mask_content}> "{args.out_file}"'
+ if prerun:
+ # submit jobs
+ job_submit(**asdict(setup))
+ _test_subfile_content(setup)
+ _test_output(setup, post_run=False)
else:
- mask_string = f'echo "{mask_content}" > "{args.out_file}"'
- if not args.mask_file:
- mask_string = " ".join(['-c "', mask_string, '"'])
- return f"{mask_string}\n"
+ # check output
+ _test_output(setup, post_run=True)
-def _test_subfile_content(setup):
- subfile = Path(setup['working_directory']) / SUBFILE
+# Helper -----------------------------------------------------------------------
+
+@dataclass
+class InputParameters:
+ """ job_submitter input parameters. """
+ working_directory: Path
+ executable: Optional[str] = None if on_windows() else "/bin/bash"
+ script_extension: Optional[str] =".bat" if on_windows() else ".sh"
+ job_output_dir: Optional[str] = "Outputdir"
+ jobid_mask: Optional[str] = "%(PARAM1)s.%(PARAM2)d"
+ replace_dict: Optional[Dict] = field(default_factory=lambda: dict(PARAM1=["a", "b"], PARAM2=[1, 2, 3]))
+ jobflavour: Optional[str] = "workday"
+ resume_jobs: Optional[bool] = True
+ check_files: Optional[Sequence] = field(default_factory=lambda: ["out.txt",])
+ dryrun: Optional[bool] = False
+ run_local: Optional[bool] = False
+ htc_arguments: Optional[Dict] = field(default_factory=lambda: {"max_retries": "4", "some_other_argument": "some_other_parameter"})
+ output_destination: Optional[Path] = None
+ mask: Union[Path, str] = None # will be set in create_mask
+
+ def create_mask(self, name: str = "test_script.mask", content: str = None, as_file: bool = False):
+ output_file = Path(self.job_output_dir, self.check_files[0])
+
+ if content is None:
+ content = self.jobid_mask
+
+ if on_windows():
+ mask_string = f'echo {content}> "{output_file!s}"'
+ else:
+ mask_string = f'echo "{content}" > "{output_file!s}"'
+ if not as_file:
+ mask_string = " ".join(['-c "', mask_string, '"'])
+
+ mask_string = f"{mask_string}\n"
+
+ if as_file:
+ mask_path = self.working_directory / name
+ with mask_path.open("w") as f:
+ f.write(mask_string)
+ self.mask = mask_path
+ else:
+ self.mask = mask_string
+
+
+def _test_subfile_content(setup: InputParameters):
+ """ Checks some of the content of the subfile (queuehtc.sub). """
+ subfile = setup.working_directory / SUBFILE
assert subfile.exists()
with subfile.open("r") as sfile:
filecontents = dict(line.rstrip().split(" = ") for line in sfile if " = " in line)
- assert filecontents["MY.JobFlavour"].strip('"') == setup["jobflavour"] # flavour is saved with "" in .sub, and read in with them
- assert filecontents["transfer_output_files"] == setup["job_output_dir"]
- for key in setup["htc_arguments"].keys():
- assert filecontents[key] == setup["htc_arguments"][key]
-
-
-def _test_output(args, post_run=True):
- for p1 in args.p1_list:
- for p2 in args.p2_list:
- current_id = args.id % dict(PARAM1=p1, PARAM2=p2)
- job_name = f"Job.{current_id}"
- job_dir_path = args.cwd / job_name
- out_dir_path = job_dir_path / args.out_dir
- out_file_path = out_dir_path / args.out_name
-
- assert job_dir_path.exists()
- assert job_dir_path.is_dir()
- if args.mask_file:
- assert (job_dir_path / args.mask_name).with_suffix(args.ext).exists()
- # assert out_dir_path.exists() # does not seem to be pre-created anymore (jdilly 2021-05-04)
- if post_run:
- assert out_dir_path.is_dir()
- assert out_file_path.exists()
- assert out_file_path.is_file()
-
- with out_file_path.open("r") as f:
- assert f.read().strip("\n") == current_id
+ assert filecontents["MY.JobFlavour"].strip('"') == setup.jobflavour # flavour is saved with "" in .sub, and read in with them
+ if setup.output_destination is None:
+ assert filecontents["transfer_output_files"] == setup.job_output_dir
+ else:
+ assert "transfer_output_files" not in filecontents
+
+ for key in setup.htc_arguments.keys():
+ assert filecontents[key] == setup.htc_arguments[key]
+
+
+def _test_output(setup: InputParameters, post_run: bool = True):
+ """ Checks the validity of the output. """
+
+ combinations = _generate_combinations(setup.replace_dict)
+ assert len(combinations)
+ assert len(combinations) == np.prod([len(v) for v in setup.replace_dict.values()])
+
+ for combination_instance in combinations:
+ current_id = setup.jobid_mask % combination_instance
+ job_name = f"Job.{current_id}"
+
+ if isinstance(setup.mask, Path):
+ assert (setup.working_directory / job_name / setup.mask.name).with_suffix(setup.script_extension).exists()
+
+ def _check_output_content(dir_path: Path, check_output: bool = True):
+ # Check if the code created the folder structure ---
+ job_path = uri_to_path(dir_path) / job_name
+
+ assert job_path.exists()
+ assert job_path.is_dir()
+
+ if check_output: # Check if the jobs created the files ---
+ out_dir_path = job_path / setup.job_output_dir
+ out_file_path = out_dir_path / setup.check_files[0]
+
+ assert out_dir_path.is_dir()
+ assert out_file_path.exists()
+ assert out_file_path.is_file()
+
+ with out_file_path.open("r") as f:
+ assert f.read().strip("\n") == current_id
+
+ # Check local working directory ---
+ _check_output_content(setup.working_directory, check_output=post_run and setup.output_destination is None)
+
+ if setup.output_destination is not None:
+ # Check copy at output destination ---
+ _check_output_content(setup.output_destination, check_output=post_run)
+
+
+def _generate_combinations(data: Dict[str, Sequence]) -> List[Dict[str, Any]]:
+ """ Creates all possible combinations of values in data as a list of dictionaries. """
+ keys = list(data.keys())
+ all_values = [data[key] for key in keys]
+
+ combinations = [
+ {keys[i]: values[i] for i in range(len(keys))}
+ for values in itertools.product(*all_values)
+ ]
+
+ return combinations