Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dws-jobtap: avoid setting aux too early #286

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 42 additions & 25 deletions src/job-manager/plugins/dws-jobtap.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <flux/core.h>
#include <flux/jobtap.h>

#define PLUGIN_NAME "dws"
#define CREATE_DEP_NAME "dws-create"
#define SETUP_PROLOG_NAME "dws-setup"
#define DWS_EPILOG_NAME "dws-epilog"
Expand All @@ -50,6 +51,11 @@ static inline const char *idf58 (flux_jobid_t id)
return buf;
}

static inline int current_job_exception (flux_plugin_t *p, const char *reason)
{
return flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, PLUGIN_NAME, 0, reason);
}

static inline int raise_job_exception (flux_plugin_t *p,
flux_jobid_t id,
const char *exception,
Expand Down Expand Up @@ -160,7 +166,7 @@ static int depend_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *ar
json_t *resources;
json_t *jobspec;
int userid;
int *prolog_active = NULL;
struct create_arg_t *create_args;

if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
Expand All @@ -179,29 +185,24 @@ static int depend_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *ar
&userid,
"jobspec",
&jobspec)
< 0)
< 0) {
current_job_exception (p, "jobtap plugin failed to unpack args");
return -1;
}
if (dw) {
if (flux_jobtap_dependency_add (p, id, CREATE_DEP_NAME) < 0) {
flux_log_error (h, "Failed to add dws jobtap dependency for %s", idf58 (id));
current_job_exception (p, "Failed to add dws jobtap dependency");
return -1;
}
// subscribe to exception events
if (flux_jobtap_job_subscribe (p, FLUX_JOBTAP_CURRENT_JOB) < 0
|| !(prolog_active = malloc (sizeof (int)))
|| flux_jobtap_job_aux_set (p,
FLUX_JOBTAP_CURRENT_JOB,
"dws_prolog_active",
prolog_active,
free)
< 0) {
free (prolog_active);
if (flux_jobtap_job_subscribe (p, FLUX_JOBTAP_CURRENT_JOB) < 0) {
current_job_exception (p, "dws-jobtap: error initializing exception-monitoring");
flux_log_error (h,
"dws-jobtap: error initializing exception-monitoring for %s",
idf58 (id));
return -1;
}
*prolog_active = 0;
flux_future_t *create_fut = flux_rpc_pack (h,
"dws.create",
FLUX_NODEID_ANY,
Expand All @@ -217,11 +218,11 @@ static int depend_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *ar
userid);
if (create_fut == NULL) {
flux_log_error (h, "Failed to send dws.create RPC for %s", idf58 (id));
current_job_exception (p, "Failed to send dws.create RPC");
return -1;
}

struct create_arg_t *create_args = calloc (1, sizeof (struct create_arg_t));
if (create_args == NULL) {
if (!(create_args = calloc (1, sizeof (struct create_arg_t)))) {
current_job_exception (p, "Failed to allocate memory");
return -1;
}
create_args->p = p;
Expand All @@ -235,6 +236,7 @@ static int depend_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *ar
(flux_free_f)flux_future_destroy)
< 0) {
flux_future_destroy (create_fut);
current_job_exception (p, "Failed to set aux on future");
return -1;
}
}
Expand Down Expand Up @@ -350,17 +352,30 @@ static int run_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *args,
"dw",
&dw)
< 0) {
flux_log_error (h, "Failed to unpack args");
current_job_exception (p, "jobtap plugin failed to unpack args");
return -1;
}
if (dw) {
if (flux_jobtap_prolog_start (p, SETUP_PROLOG_NAME) < 0
|| !(prolog_active =
flux_jobtap_job_aux_get (p, FLUX_JOBTAP_CURRENT_JOB, "dws_prolog_active"))) {
flux_log_error (h, "Failed to start dws jobtap prolog for %s", idf58 (id));
// set a boolean aux indicating whether jobtap prolog is active, so it can
// be finished if an exception occurs
if (!(prolog_active = malloc (sizeof (int)))
|| flux_jobtap_job_aux_set (p,
FLUX_JOBTAP_CURRENT_JOB,
"dws_prolog_active",
prolog_active,
free)
< 0) {
free (prolog_active);
flux_log_error (h, "dws-jobtap: error creating prolog_active aux for %s", idf58 (id));
current_job_exception (p, "error creating prolog_active aux");
return -1;
}
*prolog_active = 1;
if (flux_jobtap_prolog_start (p, SETUP_PROLOG_NAME) < 0) {
flux_log_error (h, "Failed to start dws jobtap prolog for %s", idf58 (id));
current_job_exception (p, "Failed to start dws jobtap prolog");
return -1;
}
struct create_arg_t *create_args = calloc (1, sizeof (struct create_arg_t));
if (create_args == NULL) {
dws_prolog_finish (h, p, id, 0, "OOM", prolog_active);
Expand Down Expand Up @@ -426,14 +441,15 @@ static int cleanup_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *a
"dw",
&dw)
< 0) {
flux_log_error (h, "Failed to unpack args");
current_job_exception (p, "Failed to unpack args");
return -1;
}
// check that the job has a DW attr section AND a workflow was successfully
// created for it
if (dw && flux_jobtap_job_aux_get (p, FLUX_JOBTAP_CURRENT_JOB, "flux::dws_workflow_created")) {
if (!(create_args = calloc (1, sizeof (struct create_arg_t)))) {
flux_log_error (h, "error allocating arg struct for %s", idf58 (id));
current_job_exception (p, "error allocating arg struct");
return -1;
}
create_args->p = p;
Expand All @@ -445,6 +461,7 @@ static int cleanup_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *a
if (flux_jobtap_job_aux_set (p, id, "dws_epilog_active", (void *)1, NULL) < 0
|| flux_jobtap_epilog_start (p, DWS_EPILOG_NAME) < 0) {
flux_log_error (h, "Failed to start jobtap epilog for %s", idf58 (id));
current_job_exception (p, "Failed to start jobtap epilog");
return -1;
}
if (!(post_run_fut = flux_rpc_pack (h,
Expand Down Expand Up @@ -612,7 +629,7 @@ static void resource_update_msg_cb (flux_t *h,
if (strlen (exclude_str) > 0) {
if (!(constraints = generate_constraints (h, p, jobid, exclude_str))) {
flux_log_error (h, "Could not generate exclusion constraint for %s", idf58 (jobid));
raise_job_exception (p, jobid, "dws", "Could not generate exclusion constraint");
raise_job_exception (p, jobid, PLUGIN_NAME, "Could not generate exclusion constraint");
return;
}
}
Expand All @@ -624,7 +641,7 @@ static void resource_update_msg_cb (flux_t *h,
idf58 (jobid));
errmsg_str = "<could not fetch error message>";
}
raise_job_exception (p, jobid, "dws", errmsg_str);
raise_job_exception (p, jobid, PLUGIN_NAME, errmsg_str);
json_decref (constraints);
return;
} else if (flux_jobtap_job_aux_set (p,
Expand All @@ -644,7 +661,7 @@ static void resource_update_msg_cb (flux_t *h,
flux_log_error (h,
"could not update jobspec for %s with new constraints and resources",
idf58 (jobid));
raise_job_exception (p, jobid, "dws", "Internal error: failed to update jobspec");
raise_job_exception (p, jobid, PLUGIN_NAME, "Internal error: failed to update jobspec");
json_decref (constraints);
return;
}
Expand Down Expand Up @@ -728,7 +745,7 @@ static const struct flux_plugin_handler tab[] = {

int flux_plugin_init (flux_plugin_t *p)
{
if (flux_plugin_register (p, "dws", tab) < 0
if (flux_plugin_register (p, PLUGIN_NAME, tab) < 0
|| flux_jobtap_service_register (p, "resource-update", resource_update_msg_cb, p) < 0
|| flux_jobtap_service_register (p, "prolog-remove", prolog_remove_msg_cb, p) < 0
|| flux_jobtap_service_register (p, "epilog-remove", epilog_remove_msg_cb, p) < 0) {
Expand Down