diff --git a/src/broker/brokercfg.c b/src/broker/brokercfg.c index 7ce7f4e41b64..f913e52d7cdf 100644 --- a/src/broker/brokercfg.c +++ b/src/broker/brokercfg.c @@ -20,7 +20,6 @@ #include "src/common/libutil/log.h" #include "src/common/libutil/errprintf.h" -#include "src/common/libutil/fsd.h" #include "attr.h" #include "modhash.h" @@ -35,341 +34,10 @@ struct brokercfg { flux_future_t *reload_f; }; -static int validate_policy_jobspec (json_t *o, - const char *key, - const char **default_queue, - flux_error_t *error) -{ - json_error_t jerror; - json_t *duration = NULL; - json_t *queue = NULL; - - if (json_unpack_ex (o, - &jerror, - 0, - "{s?{s?{s?o s?o !} !} !}", - "defaults", - "system", - "duration", &duration, - "queue", &queue) < 0) { - errprintf (error, "%s: %s", key, jerror.text); - goto inval; - } - if (duration) { - double d; - - if (!json_is_string (duration) - || fsd_parse_duration (json_string_value (duration), &d) < 0) { - errprintf (error, - "%s.defaults.system.duration is not a valid FSD", - key); - goto inval; - } - } - if (queue) { - if (!json_is_string (queue)) { - errprintf (error, "%s.defaults.system.queue is not a string", key); - goto inval; - } - } - if (default_queue) - *default_queue = queue ? json_string_value (queue) : NULL; - return 0; -inval: - errno = EINVAL; - return -1; -} - -static int validate_policy_limits_job_size (json_t *o, - const char *key, - const char *key2, - flux_error_t *error) -{ - json_error_t jerror; - int nnodes = -1; - int ncores = -1; - int ngpus = -1; - - if (json_unpack_ex (o, - &jerror, - 0, - "{s?i s?i s?i !}", - "nnodes", &nnodes, - "ncores", &ncores, - "ngpus", &ngpus) < 0) { - errprintf (error, "%s.%s: %s", key, key2, jerror.text); - goto inval; - } - if (nnodes < -1 || ncores < -1 || ngpus < -1) { - errprintf (error, "%s.%s: values must be >= -1", key, key2); - goto inval; - } - return 0; -inval: - errno = EINVAL; - return -1; -} - -static int validate_policy_limits (json_t *o, - const char *key, - flux_error_t *error) -{ - json_error_t jerror; - json_t *job_size = NULL; - json_t *duration = NULL; - - if (json_unpack_ex (o, - &jerror, - 0, - "{s?o s?o !}", - "job-size", &job_size, - "duration", &duration) < 0) { - errprintf (error, "%s: %s", key, jerror.text); - goto inval; - } - if (duration) { - double d; - - if (!json_is_string (duration) - || fsd_parse_duration (json_string_value (duration), &d) < 0) { - errprintf (error, "%s.duration is not a valid FSD", key); - goto inval; - } - } - if (job_size) { - json_t *min = NULL; - json_t *max = NULL; - - if (json_unpack_ex (job_size, - &jerror, - 0, - "{s?o s?o !}", - "min", &min, - "max", &max) < 0) { - errprintf (error, "%s.job-size: %s", key, jerror.text); - goto inval; - } - if (min) { - if (validate_policy_limits_job_size (min, key, "min", error) < 0) - goto inval; - } - if (max) { - if (validate_policy_limits_job_size (max, key, "max", error) < 0) - goto inval; - } - } - return 0; -inval: - errno = EINVAL; - return -1; -} - -static bool is_string_array (json_t *o, const char *banned) -{ - size_t index; - json_t *val; - - if (!json_is_array (o)) - return false; - json_array_foreach (o, index, val) { - if (!json_is_string (val)) - return false; - if (banned) { - for (int i = 0; banned[i] != '\0'; i++) { - if (strchr (json_string_value (val), banned[i])) - return false; - } - } - } - return true; -} - -static int validate_policy_access (json_t *o, - const char *key, - flux_error_t *error) -{ - json_error_t jerror; - json_t *allow_user = NULL; - json_t *allow_group = NULL; - - if (json_unpack_ex (o, - &jerror, - 0, - "{s?o s?o !}", - "allow-user", &allow_user, - "allow-group", &allow_group) < 0) { - errprintf (error, "%s: %s", key, jerror.text); - goto inval; - } - if (allow_user) { - if (!is_string_array (allow_user, NULL)) { - errprintf (error, "%s.allow-user must be a string array", key); - goto inval; - } - } - if (allow_group) { - if (!is_string_array (allow_group, NULL)) { - errprintf (error, "%s.allow-group must be a string array", key); - goto inval; - } - } - return 0; -inval: - errno = EINVAL; - return -1; -} - -/* Validate the policy table as defined by RFC 33. The table can appear at - * the top level of the config or within a queues entry. - */ -static int validate_policy_json (json_t *policy, - const char *key, - const char **default_queue, - flux_error_t *error) -{ - json_error_t jerror; - json_t *jobspec = NULL; - json_t *limits = NULL; - json_t *access = NULL; - json_t *scheduler = NULL; - const char *defqueue = NULL; - char key2[1024]; - - if (json_unpack_ex (policy, - &jerror, - 0, - "{s?o s?o s?o s?o !}", - "jobspec", &jobspec, - "limits", &limits, - "access", &access, - "scheduler", &scheduler) < 0) { - errprintf (error, "%s: %s", key, jerror.text); - errno = EINVAL; - return -1; - } - if (jobspec) { - snprintf (key2, sizeof (key2), "%s.jobspec", key); - if (validate_policy_jobspec (jobspec, key2, &defqueue, error) < 0) - return -1; - } - if (limits) { - snprintf (key2, sizeof (key2), "%s.limits", key); - if (validate_policy_limits (limits, key2, error) < 0) - return -1; - } - if (access) { - snprintf (key2, sizeof (key2), "%s.access", key); - if (validate_policy_access (access, key2, error) < 0) - return -1; - } - if (default_queue) - *default_queue = defqueue; - return 0; -} - -static int validate_policy_config (const flux_conf_t *conf, - const char **default_queue, - flux_error_t *error) -{ - json_t *policy = NULL; - const char *defqueue = NULL; - - if (flux_conf_unpack (conf, - error, - "{s?o}", - "policy", &policy) < 0) - return -1; - if (policy) { - if (validate_policy_json (policy, "policy", &defqueue, error) < 0) - return -1; - } - if (default_queue) - *default_queue = defqueue; - return 0; -} - -static int validate_queues_config (const flux_conf_t *conf, - const char *default_queue, - flux_error_t *error) -{ - json_t *queues = NULL; - - if (flux_conf_unpack (conf, - error, - "{s?o}", - "queues", &queues) < 0) - return -1; - if (queues) { - const char *name; - json_t *entry; - - if (!json_is_object (queues)) { - errprintf (error, "queues must be a table"); - goto inval; - } - json_object_foreach (queues, name, entry) { - json_error_t jerror; - json_t *policy = NULL; - json_t *requires = NULL; - - if (json_unpack_ex (entry, - &jerror, - 0, - "{s?o s?o !}", - "policy", &policy, - "requires", &requires) < 0) { - errprintf (error, "queues.%s: %s", name, jerror.text); - goto inval; - } - if (policy) { - char key[1024]; - const char *defqueue; - snprintf (key, sizeof (key), "queues.%s.policy", name); - if (validate_policy_json (policy, key, &defqueue, error) < 0) - return -1; - if (defqueue) { - errprintf (error, - "%s: queue policy includes default queue!", - key); - goto inval; - } - } - if (requires) { - const char *banned_property_chars = " \t!&'\"`'|()"; - if (!is_string_array (requires, banned_property_chars)) { - errprintf (error, - "queues.%s.requires must be an array of %s", - name, - "property strings (RFC 20)"); - goto inval; - } - } - } - } - if (default_queue) { - if (!queues || !json_object_get (queues, default_queue)) { - errprintf (error, - "default queue '%s' is not in queues table", - default_queue); - goto inval; - } - } - return 0; -inval: - errno = EINVAL; - return -1; -} - static int brokercfg_set (flux_t *h, const flux_conf_t *conf, flux_error_t *error) { - const char *defqueue; - - if (validate_policy_config (conf, &defqueue, error) < 0 - || validate_queues_config (conf, defqueue, error) < 0) - return -1; - if (flux_set_conf (h, conf) < 0) { errprintf (error, "Error caching config object"); return -1; diff --git a/src/common/libfluxutil/Makefile.am b/src/common/libfluxutil/Makefile.am index b3f28aa73611..34d034dfdd33 100644 --- a/src/common/libfluxutil/Makefile.am +++ b/src/common/libfluxutil/Makefile.am @@ -14,7 +14,9 @@ AM_CPPFLAGS = \ noinst_LTLIBRARIES = libfluxutil.la libfluxutil_la_SOURCES = \ method.h \ - method.c + method.c \ + policy.h \ + policy.c libfluxutil_la_CPPFLAGS = \ $(AM_CPPFLAGS) diff --git a/src/common/libfluxutil/policy.c b/src/common/libfluxutil/policy.c new file mode 100644 index 000000000000..be1c0dbc32ec --- /dev/null +++ b/src/common/libfluxutil/policy.c @@ -0,0 +1,360 @@ +/************************************************************\ + * Copyright 2022 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* policy.c - parse and validate RFC 33 queue/policy config tables + */ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include + +#include "src/common/libutil/errprintf.h" +#include "src/common/libutil/fsd.h" + +#include "policy.h" + +static int validate_policy_jobspec (json_t *o, + const char *key, + const char **default_queue, + flux_error_t *error) +{ + json_error_t jerror; + json_t *duration = NULL; + json_t *queue = NULL; + + if (json_unpack_ex (o, + &jerror, + 0, + "{s?{s?{s?o s?o !} !} !}", + "defaults", + "system", + "duration", &duration, + "queue", &queue) < 0) { + errprintf (error, "%s: %s", key, jerror.text); + goto inval; + } + if (duration) { + double d; + + if (!json_is_string (duration) + || fsd_parse_duration (json_string_value (duration), &d) < 0) { + errprintf (error, + "%s.defaults.system.duration is not a valid FSD", + key); + goto inval; + } + } + if (queue) { + if (!json_is_string (queue)) { + errprintf (error, "%s.defaults.system.queue is not a string", key); + goto inval; + } + } + if (default_queue) + *default_queue = queue ? json_string_value (queue) : NULL; + return 0; +inval: + errno = EINVAL; + return -1; +} + +static int validate_policy_limits_job_size (json_t *o, + const char *key, + const char *key2, + flux_error_t *error) +{ + json_error_t jerror; + int nnodes = -1; + int ncores = -1; + int ngpus = -1; + + if (json_unpack_ex (o, + &jerror, + 0, + "{s?i s?i s?i !}", + "nnodes", &nnodes, + "ncores", &ncores, + "ngpus", &ngpus) < 0) { + errprintf (error, "%s.%s: %s", key, key2, jerror.text); + goto inval; + } + if (nnodes < -1 || ncores < -1 || ngpus < -1) { + errprintf (error, "%s.%s: values must be >= -1", key, key2); + goto inval; + } + return 0; +inval: + errno = EINVAL; + return -1; +} + +static int validate_policy_limits (json_t *o, + const char *key, + flux_error_t *error) +{ + json_error_t jerror; + json_t *job_size = NULL; + json_t *duration = NULL; + + if (json_unpack_ex (o, + &jerror, + 0, + "{s?o s?o !}", + "job-size", &job_size, + "duration", &duration) < 0) { + errprintf (error, "%s: %s", key, jerror.text); + goto inval; + } + if (duration) { + double d; + + if (!json_is_string (duration) + || fsd_parse_duration (json_string_value (duration), &d) < 0) { + errprintf (error, "%s.duration is not a valid FSD", key); + goto inval; + } + } + if (job_size) { + json_t *min = NULL; + json_t *max = NULL; + + if (json_unpack_ex (job_size, + &jerror, + 0, + "{s?o s?o !}", + "min", &min, + "max", &max) < 0) { + errprintf (error, "%s.job-size: %s", key, jerror.text); + goto inval; + } + if (min) { + if (validate_policy_limits_job_size (min, key, "min", error) < 0) + goto inval; + } + if (max) { + if (validate_policy_limits_job_size (max, key, "max", error) < 0) + goto inval; + } + } + return 0; +inval: + errno = EINVAL; + return -1; +} + +static bool is_string_array (json_t *o, const char *banned) +{ + size_t index; + json_t *val; + + if (!json_is_array (o)) + return false; + json_array_foreach (o, index, val) { + if (!json_is_string (val)) + return false; + if (banned) { + for (int i = 0; banned[i] != '\0'; i++) { + if (strchr (json_string_value (val), banned[i])) + return false; + } + } + } + return true; +} + +static int validate_policy_access (json_t *o, + const char *key, + flux_error_t *error) +{ + json_error_t jerror; + json_t *allow_user = NULL; + json_t *allow_group = NULL; + + if (json_unpack_ex (o, + &jerror, + 0, + "{s?o s?o !}", + "allow-user", &allow_user, + "allow-group", &allow_group) < 0) { + errprintf (error, "%s: %s", key, jerror.text); + goto inval; + } + if (allow_user) { + if (!is_string_array (allow_user, NULL)) { + errprintf (error, "%s.allow-user must be a string array", key); + goto inval; + } + } + if (allow_group) { + if (!is_string_array (allow_group, NULL)) { + errprintf (error, "%s.allow-group must be a string array", key); + goto inval; + } + } + return 0; +inval: + errno = EINVAL; + return -1; +} + +/* Validate the policy table as defined by RFC 33. The table can appear at + * the top level of the config or within a queues entry. + */ +static int validate_policy_json (json_t *policy, + const char *key, + const char **default_queue, + flux_error_t *error) +{ + json_error_t jerror; + json_t *jobspec = NULL; + json_t *limits = NULL; + json_t *access = NULL; + json_t *scheduler = NULL; + const char *defqueue = NULL; + char key2[1024]; + + if (json_unpack_ex (policy, + &jerror, + 0, + "{s?o s?o s?o s?o !}", + "jobspec", &jobspec, + "limits", &limits, + "access", &access, + "scheduler", &scheduler) < 0) { + errprintf (error, "%s: %s", key, jerror.text); + errno = EINVAL; + return -1; + } + if (jobspec) { + snprintf (key2, sizeof (key2), "%s.jobspec", key); + if (validate_policy_jobspec (jobspec, key2, &defqueue, error) < 0) + return -1; + } + if (limits) { + snprintf (key2, sizeof (key2), "%s.limits", key); + if (validate_policy_limits (limits, key2, error) < 0) + return -1; + } + if (access) { + snprintf (key2, sizeof (key2), "%s.access", key); + if (validate_policy_access (access, key2, error) < 0) + return -1; + } + if (default_queue) + *default_queue = defqueue; + return 0; +} + +static int validate_policy_config (const flux_conf_t *conf, + const char **default_queue, + flux_error_t *error) +{ + json_t *policy = NULL; + const char *defqueue = NULL; + + if (flux_conf_unpack (conf, + error, + "{s?o}", + "policy", &policy) < 0) + return -1; + if (policy) { + if (validate_policy_json (policy, "policy", &defqueue, error) < 0) + return -1; + } + if (default_queue) + *default_queue = defqueue; + return 0; +} + +static int validate_queues_config (const flux_conf_t *conf, + const char *default_queue, + flux_error_t *error) +{ + json_t *queues = NULL; + + if (flux_conf_unpack (conf, + error, + "{s?o}", + "queues", &queues) < 0) + return -1; + if (queues) { + const char *name; + json_t *entry; + + if (!json_is_object (queues)) { + errprintf (error, "queues must be a table"); + goto inval; + } + json_object_foreach (queues, name, entry) { + json_error_t jerror; + json_t *policy = NULL; + json_t *requires = NULL; + + if (json_unpack_ex (entry, + &jerror, + 0, + "{s?o s?o !}", + "policy", &policy, + "requires", &requires) < 0) { + errprintf (error, "queues.%s: %s", name, jerror.text); + goto inval; + } + if (policy) { + char key[1024]; + const char *defqueue; + snprintf (key, sizeof (key), "queues.%s.policy", name); + if (validate_policy_json (policy, key, &defqueue, error) < 0) + return -1; + if (defqueue) { + errprintf (error, + "%s: queue policy includes default queue!", + key); + goto inval; + } + } + if (requires) { + const char *banned_property_chars = " \t!&'\"`'|()"; + if (!is_string_array (requires, banned_property_chars)) { + errprintf (error, + "queues.%s.requires must be an array of %s", + name, + "property strings (RFC 20)"); + goto inval; + } + } + } + } + if (default_queue) { + if (!queues || !json_object_get (queues, default_queue)) { + errprintf (error, + "default queue '%s' is not in queues table", + default_queue); + goto inval; + } + } + return 0; +inval: + errno = EINVAL; + return -1; +} + +int policy_validate (const flux_conf_t *conf, flux_error_t *error) +{ + const char *defqueue; + + if (validate_policy_config (conf, &defqueue, error) < 0 + || validate_queues_config (conf, defqueue, error) < 0) + return -1; + return 0; +} + +// vi:ts=4 sw=4 expandtab diff --git a/src/common/libfluxutil/policy.h b/src/common/libfluxutil/policy.h new file mode 100644 index 000000000000..a60e72fcccfa --- /dev/null +++ b/src/common/libfluxutil/policy.h @@ -0,0 +1,23 @@ +/************************************************************\ + * Copyright 2022 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef _LIBFLUXUTIL_POLICY_H +#define _LIBFLUXUTIL_POLICY_H + +#include + +/* Validate [policy] and [queue] configuration tables defined in RFC 33. + * Return 0 on success, -1 on failure with errno set and 'error' filled in. + */ +int policy_validate (const flux_conf_t *conf, flux_error_t *error); + +// vi:ts=4 sw=4 expandtab + +#endif // !_LIBFLUXUTIL_POLICY_H diff --git a/src/modules/job-ingest/job-ingest.c b/src/modules/job-ingest/job-ingest.c index 494efaaf1ec3..fea532f392c8 100644 --- a/src/modules/job-ingest/job-ingest.c +++ b/src/modules/job-ingest/job-ingest.c @@ -25,6 +25,7 @@ #include "src/common/libutil/jpath.h" #include "src/common/libutil/errprintf.h" #include "src/common/libjob/job_hash.h" +#include "src/common/libfluxutil/policy.h" #include "ccan/str/str.h" #include "util.h" @@ -647,6 +648,8 @@ static int job_ingest_configure (struct job_ingest_ctx *ctx, { flux_error_t conf_error; + if (policy_validate (conf, error) < 0) + return -1; if (pipeline_configure (ctx->pipeline, conf, argc, argv, error) < 0) return -1; if (flux_conf_unpack (conf, diff --git a/src/modules/job-manager/conf.c b/src/modules/job-manager/conf.c index f6df5d4ced93..b9e9842dc7f0 100644 --- a/src/modules/job-manager/conf.c +++ b/src/modules/job-manager/conf.c @@ -18,6 +18,7 @@ #include "src/common/libutil/fsd.h" #include "src/common/libutil/errprintf.h" +#include "src/common/libfluxutil/policy.h" #include "job-manager.h" #include "journal.h" @@ -116,6 +117,10 @@ static void config_reload_cb (flux_t *h, if (flux_conf_reload_decode (msg, &instance_conf) < 0) goto error; + if (policy_validate (instance_conf, &error) < 0) { + errstr = error.text; + goto error; + } ccb = zlistx_first (conf->callbacks); while (ccb) { if (ccb->cb (instance_conf, &error, ccb->arg) < 0) { @@ -154,13 +159,15 @@ void conf_destroy (struct conf *conf) } } -struct conf *conf_create (struct job_manager *ctx) +struct conf *conf_create (struct job_manager *ctx, flux_error_t *error) { struct conf *conf; if (!(conf = calloc (1, sizeof (*conf)))) - return NULL; + goto error; conf->ctx = ctx; + if (policy_validate (flux_get_conf (ctx->h), error) < 0) + goto error_nofill; if (!(conf->callbacks = zlistx_new ())) { errno = ENOMEM; goto error; @@ -170,6 +177,8 @@ struct conf *conf_create (struct job_manager *ctx) goto error; return conf; error: + errprintf (error, "%s", strerror (errno)); +error_nofill: conf_destroy (conf); return NULL; } diff --git a/src/modules/job-manager/conf.h b/src/modules/job-manager/conf.h index 69d0a90252aa..529d3ba1664c 100644 --- a/src/modules/job-manager/conf.h +++ b/src/modules/job-manager/conf.h @@ -25,7 +25,7 @@ typedef int (*conf_update_f)(const flux_conf_t *conf, flux_error_t *error, void *arg); -struct conf *conf_create (struct job_manager *ctx); +struct conf *conf_create (struct job_manager *ctx, flux_error_t *error); void conf_destroy (struct conf *conf); /* Immediately call 'cb' on current config object, and then on config updates diff --git a/src/modules/job-manager/job-manager.c b/src/modules/job-manager/job-manager.c index 9d05f78fe9d1..e7723578ee07 100644 --- a/src/modules/job-manager/job-manager.c +++ b/src/modules/job-manager/job-manager.c @@ -153,6 +153,7 @@ int mod_main (flux_t *h, int argc, char **argv) flux_reactor_t *r = flux_get_reactor (h); int rc = -1; struct job_manager ctx; + flux_error_t error; memset (&ctx, 0, sizeof (ctx)); ctx.h = h; @@ -166,8 +167,8 @@ int mod_main (flux_t *h, int argc, char **argv) zhashx_set_duplicator (ctx.active_jobs, job_duplicator); zhashx_set_destructor (ctx.inactive_jobs, job_destructor); zhashx_set_duplicator (ctx.inactive_jobs, job_duplicator); - if (!(ctx.conf = conf_create (&ctx))) { - flux_log_error (h, "error creating conf context"); + if (!(ctx.conf = conf_create (&ctx, &error))) { + flux_log (h, LOG_ERR, "config: %s", error.text); goto done; } if (!(ctx.jobtap = jobtap_create (&ctx))) { diff --git a/src/modules/job-manager/queue.c b/src/modules/job-manager/queue.c index dd3134203623..5f847fb04739 100644 --- a/src/modules/job-manager/queue.c +++ b/src/modules/job-manager/queue.c @@ -423,8 +423,9 @@ bool queue_started (struct queue *queue, struct job *job) return queue->anon->start; } -/* N.B. the broker will have already validated the basic queue configuration so - * we shouldn't need to produce detailed configuration errors for users here. +/* N.B. the basic queue configuration should have already been validated by + * policy_validate() so we shouldn't need to produce detailed configuration + * errors for users here. */ static int queue_configure (const flux_conf_t *conf, flux_error_t *error, diff --git a/t/t2245-policy-config.t b/t/t2245-policy-config.t index 9af7ed03994a..e3f7e9c9b876 100755 --- a/t/t2245-policy-config.t +++ b/t/t2245-policy-config.t @@ -10,7 +10,7 @@ specified in RFC 33 for [policy] and [queue..policy]. mkdir -p config -test_under_flux 1 minimal +test_under_flux 1 job flux setattr log-stderr-level 1 @@ -175,4 +175,12 @@ test_expect_success 'valid config passes' ' [queues.batch] EOT ' + +test_expect_success 'a bad config is detected at initialization too' ' + cat >badconf.toml <<-EOT && + policy.foo = 1 + EOT + test_must_fail flux start -o,--config-path=badconf.toml true +' + test_done