diff --git a/src/job-manager/plugins/dws-jobtap.c b/src/job-manager/plugins/dws-jobtap.c index 42ef741..47a86f8 100644 --- a/src/job-manager/plugins/dws-jobtap.c +++ b/src/job-manager/plugins/dws-jobtap.c @@ -26,6 +26,7 @@ #include #include +#define PLUGIN_NAME "dws" #define CREATE_DEP_NAME "dws-create" #define SETUP_PROLOG_NAME "dws-setup" #define DWS_EPILOG_NAME "dws-epilog" @@ -50,6 +51,11 @@ static inline const char *idf58 (flux_jobid_t id) return buf; } +static inline int current_job_exception (flux_plugin_t *p, const char *reason) +{ + return flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, PLUGIN_NAME, 0, reason); +} + static inline int raise_job_exception (flux_plugin_t *p, flux_jobid_t id, const char *exception, @@ -160,7 +166,7 @@ static int depend_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *ar json_t *resources; json_t *jobspec; int userid; - int *prolog_active = NULL; + struct create_arg_t *create_args; if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, @@ -179,29 +185,24 @@ static int depend_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *ar &userid, "jobspec", &jobspec) - < 0) + < 0) { + current_job_exception (p, "jobtap plugin failed to unpack args"); return -1; + } if (dw) { if (flux_jobtap_dependency_add (p, id, CREATE_DEP_NAME) < 0) { flux_log_error (h, "Failed to add dws jobtap dependency for %s", idf58 (id)); + current_job_exception (p, "Failed to add dws jobtap dependency"); return -1; } // subscribe to exception events - if (flux_jobtap_job_subscribe (p, FLUX_JOBTAP_CURRENT_JOB) < 0 - || !(prolog_active = malloc (sizeof (int))) - || flux_jobtap_job_aux_set (p, - FLUX_JOBTAP_CURRENT_JOB, - "dws_prolog_active", - prolog_active, - free) - < 0) { - free (prolog_active); + if (flux_jobtap_job_subscribe (p, FLUX_JOBTAP_CURRENT_JOB) < 0) { + current_job_exception (p, "dws-jobtap: error initializing exception-monitoring"); flux_log_error (h, "dws-jobtap: error initializing exception-monitoring for %s", idf58 (id)); return -1; } - *prolog_active = 0; flux_future_t *create_fut = flux_rpc_pack (h, "dws.create", FLUX_NODEID_ANY, @@ -217,11 +218,11 @@ static int depend_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *ar userid); if (create_fut == NULL) { flux_log_error (h, "Failed to send dws.create RPC for %s", idf58 (id)); + current_job_exception (p, "Failed to send dws.create RPC"); return -1; } - - struct create_arg_t *create_args = calloc (1, sizeof (struct create_arg_t)); - if (create_args == NULL) { + if (!(create_args = calloc (1, sizeof (struct create_arg_t)))) { + current_job_exception (p, "Failed to allocate memory"); return -1; } create_args->p = p; @@ -235,6 +236,7 @@ static int depend_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *ar (flux_free_f)flux_future_destroy) < 0) { flux_future_destroy (create_fut); + current_job_exception (p, "Failed to set aux on future"); return -1; } } @@ -350,17 +352,30 @@ static int run_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *args, "dw", &dw) < 0) { - flux_log_error (h, "Failed to unpack args"); + current_job_exception (p, "jobtap plugin failed to unpack args"); return -1; } if (dw) { - if (flux_jobtap_prolog_start (p, SETUP_PROLOG_NAME) < 0 - || !(prolog_active = - flux_jobtap_job_aux_get (p, FLUX_JOBTAP_CURRENT_JOB, "dws_prolog_active"))) { - flux_log_error (h, "Failed to start dws jobtap prolog for %s", idf58 (id)); + // set a boolean aux indicating whether jobtap prolog is active, so it can + // be finished if an exception occurs + if (!(prolog_active = malloc (sizeof (int))) + || flux_jobtap_job_aux_set (p, + FLUX_JOBTAP_CURRENT_JOB, + "dws_prolog_active", + prolog_active, + free) + < 0) { + free (prolog_active); + flux_log_error (h, "dws-jobtap: error creating prolog_active aux for %s", idf58 (id)); + current_job_exception (p, "error creating prolog_active aux"); return -1; } *prolog_active = 1; + if (flux_jobtap_prolog_start (p, SETUP_PROLOG_NAME) < 0) { + flux_log_error (h, "Failed to start dws jobtap prolog for %s", idf58 (id)); + current_job_exception (p, "Failed to start dws jobtap prolog"); + return -1; + } struct create_arg_t *create_args = calloc (1, sizeof (struct create_arg_t)); if (create_args == NULL) { dws_prolog_finish (h, p, id, 0, "OOM", prolog_active); @@ -426,7 +441,7 @@ static int cleanup_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *a "dw", &dw) < 0) { - flux_log_error (h, "Failed to unpack args"); + current_job_exception (p, "Failed to unpack args"); return -1; } // check that the job has a DW attr section AND a workflow was successfully @@ -434,6 +449,7 @@ static int cleanup_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *a if (dw && flux_jobtap_job_aux_get (p, FLUX_JOBTAP_CURRENT_JOB, "flux::dws_workflow_created")) { if (!(create_args = calloc (1, sizeof (struct create_arg_t)))) { flux_log_error (h, "error allocating arg struct for %s", idf58 (id)); + current_job_exception (p, "error allocating arg struct"); return -1; } create_args->p = p; @@ -445,6 +461,7 @@ static int cleanup_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *a if (flux_jobtap_job_aux_set (p, id, "dws_epilog_active", (void *)1, NULL) < 0 || flux_jobtap_epilog_start (p, DWS_EPILOG_NAME) < 0) { flux_log_error (h, "Failed to start jobtap epilog for %s", idf58 (id)); + current_job_exception (p, "Failed to start jobtap epilog"); return -1; } if (!(post_run_fut = flux_rpc_pack (h, @@ -612,7 +629,7 @@ static void resource_update_msg_cb (flux_t *h, if (strlen (exclude_str) > 0) { if (!(constraints = generate_constraints (h, p, jobid, exclude_str))) { flux_log_error (h, "Could not generate exclusion constraint for %s", idf58 (jobid)); - raise_job_exception (p, jobid, "dws", "Could not generate exclusion constraint"); + raise_job_exception (p, jobid, PLUGIN_NAME, "Could not generate exclusion constraint"); return; } } @@ -624,7 +641,7 @@ static void resource_update_msg_cb (flux_t *h, idf58 (jobid)); errmsg_str = ""; } - raise_job_exception (p, jobid, "dws", errmsg_str); + raise_job_exception (p, jobid, PLUGIN_NAME, errmsg_str); json_decref (constraints); return; } else if (flux_jobtap_job_aux_set (p, @@ -644,7 +661,7 @@ static void resource_update_msg_cb (flux_t *h, flux_log_error (h, "could not update jobspec for %s with new constraints and resources", idf58 (jobid)); - raise_job_exception (p, jobid, "dws", "Internal error: failed to update jobspec"); + raise_job_exception (p, jobid, PLUGIN_NAME, "Internal error: failed to update jobspec"); json_decref (constraints); return; } @@ -728,7 +745,7 @@ static const struct flux_plugin_handler tab[] = { int flux_plugin_init (flux_plugin_t *p) { - if (flux_plugin_register (p, "dws", tab) < 0 + if (flux_plugin_register (p, PLUGIN_NAME, tab) < 0 || flux_jobtap_service_register (p, "resource-update", resource_update_msg_cb, p) < 0 || flux_jobtap_service_register (p, "prolog-remove", prolog_remove_msg_cb, p) < 0 || flux_jobtap_service_register (p, "epilog-remove", epilog_remove_msg_cb, p) < 0) {