Skip to content

Commit

Permalink
Merge pull request #286 from jameshcorbett/dws-jobtap-aux-only-in-run
Browse files Browse the repository at this point in the history
dws-jobtap: avoid setting aux too early
  • Loading branch information
mergify[bot] authored Feb 25, 2025
2 parents 6182c14 + 291672c commit 385475c
Showing 1 changed file with 42 additions and 25 deletions.
67 changes: 42 additions & 25 deletions src/job-manager/plugins/dws-jobtap.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <flux/core.h>
#include <flux/jobtap.h>

#define PLUGIN_NAME "dws"
#define CREATE_DEP_NAME "dws-create"
#define SETUP_PROLOG_NAME "dws-setup"
#define DWS_EPILOG_NAME "dws-epilog"
Expand All @@ -50,6 +51,11 @@ static inline const char *idf58 (flux_jobid_t id)
return buf;
}

static inline int current_job_exception (flux_plugin_t *p, const char *reason)
{
return flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, PLUGIN_NAME, 0, reason);
}

static inline int raise_job_exception (flux_plugin_t *p,
flux_jobid_t id,
const char *exception,
Expand Down Expand Up @@ -160,7 +166,7 @@ static int depend_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *ar
json_t *resources;
json_t *jobspec;
int userid;
int *prolog_active = NULL;
struct create_arg_t *create_args;

if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
Expand All @@ -179,29 +185,24 @@ static int depend_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *ar
&userid,
"jobspec",
&jobspec)
< 0)
< 0) {
current_job_exception (p, "jobtap plugin failed to unpack args");
return -1;
}
if (dw) {
if (flux_jobtap_dependency_add (p, id, CREATE_DEP_NAME) < 0) {
flux_log_error (h, "Failed to add dws jobtap dependency for %s", idf58 (id));
current_job_exception (p, "Failed to add dws jobtap dependency");
return -1;
}
// subscribe to exception events
if (flux_jobtap_job_subscribe (p, FLUX_JOBTAP_CURRENT_JOB) < 0
|| !(prolog_active = malloc (sizeof (int)))
|| flux_jobtap_job_aux_set (p,
FLUX_JOBTAP_CURRENT_JOB,
"dws_prolog_active",
prolog_active,
free)
< 0) {
free (prolog_active);
if (flux_jobtap_job_subscribe (p, FLUX_JOBTAP_CURRENT_JOB) < 0) {
current_job_exception (p, "dws-jobtap: error initializing exception-monitoring");
flux_log_error (h,
"dws-jobtap: error initializing exception-monitoring for %s",
idf58 (id));
return -1;
}
*prolog_active = 0;
flux_future_t *create_fut = flux_rpc_pack (h,
"dws.create",
FLUX_NODEID_ANY,
Expand All @@ -217,11 +218,11 @@ static int depend_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *ar
userid);
if (create_fut == NULL) {
flux_log_error (h, "Failed to send dws.create RPC for %s", idf58 (id));
current_job_exception (p, "Failed to send dws.create RPC");
return -1;
}

struct create_arg_t *create_args = calloc (1, sizeof (struct create_arg_t));
if (create_args == NULL) {
if (!(create_args = calloc (1, sizeof (struct create_arg_t)))) {
current_job_exception (p, "Failed to allocate memory");
return -1;
}
create_args->p = p;
Expand All @@ -235,6 +236,7 @@ static int depend_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *ar
(flux_free_f)flux_future_destroy)
< 0) {
flux_future_destroy (create_fut);
current_job_exception (p, "Failed to set aux on future");
return -1;
}
}
Expand Down Expand Up @@ -350,17 +352,30 @@ static int run_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *args,
"dw",
&dw)
< 0) {
flux_log_error (h, "Failed to unpack args");
current_job_exception (p, "jobtap plugin failed to unpack args");
return -1;
}
if (dw) {
if (flux_jobtap_prolog_start (p, SETUP_PROLOG_NAME) < 0
|| !(prolog_active =
flux_jobtap_job_aux_get (p, FLUX_JOBTAP_CURRENT_JOB, "dws_prolog_active"))) {
flux_log_error (h, "Failed to start dws jobtap prolog for %s", idf58 (id));
// set a boolean aux indicating whether jobtap prolog is active, so it can
// be finished if an exception occurs
if (!(prolog_active = malloc (sizeof (int)))
|| flux_jobtap_job_aux_set (p,
FLUX_JOBTAP_CURRENT_JOB,
"dws_prolog_active",
prolog_active,
free)
< 0) {
free (prolog_active);
flux_log_error (h, "dws-jobtap: error creating prolog_active aux for %s", idf58 (id));
current_job_exception (p, "error creating prolog_active aux");
return -1;
}
*prolog_active = 1;
if (flux_jobtap_prolog_start (p, SETUP_PROLOG_NAME) < 0) {
flux_log_error (h, "Failed to start dws jobtap prolog for %s", idf58 (id));
current_job_exception (p, "Failed to start dws jobtap prolog");
return -1;
}
struct create_arg_t *create_args = calloc (1, sizeof (struct create_arg_t));
if (create_args == NULL) {
dws_prolog_finish (h, p, id, 0, "OOM", prolog_active);
Expand Down Expand Up @@ -426,14 +441,15 @@ static int cleanup_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *a
"dw",
&dw)
< 0) {
flux_log_error (h, "Failed to unpack args");
current_job_exception (p, "Failed to unpack args");
return -1;
}
// check that the job has a DW attr section AND a workflow was successfully
// created for it
if (dw && flux_jobtap_job_aux_get (p, FLUX_JOBTAP_CURRENT_JOB, "flux::dws_workflow_created")) {
if (!(create_args = calloc (1, sizeof (struct create_arg_t)))) {
flux_log_error (h, "error allocating arg struct for %s", idf58 (id));
current_job_exception (p, "error allocating arg struct");
return -1;
}
create_args->p = p;
Expand All @@ -445,6 +461,7 @@ static int cleanup_cb (flux_plugin_t *p, const char *topic, flux_plugin_arg_t *a
if (flux_jobtap_job_aux_set (p, id, "dws_epilog_active", (void *)1, NULL) < 0
|| flux_jobtap_epilog_start (p, DWS_EPILOG_NAME) < 0) {
flux_log_error (h, "Failed to start jobtap epilog for %s", idf58 (id));
current_job_exception (p, "Failed to start jobtap epilog");
return -1;
}
if (!(post_run_fut = flux_rpc_pack (h,
Expand Down Expand Up @@ -612,7 +629,7 @@ static void resource_update_msg_cb (flux_t *h,
if (strlen (exclude_str) > 0) {
if (!(constraints = generate_constraints (h, p, jobid, exclude_str))) {
flux_log_error (h, "Could not generate exclusion constraint for %s", idf58 (jobid));
raise_job_exception (p, jobid, "dws", "Could not generate exclusion constraint");
raise_job_exception (p, jobid, PLUGIN_NAME, "Could not generate exclusion constraint");
return;
}
}
Expand All @@ -624,7 +641,7 @@ static void resource_update_msg_cb (flux_t *h,
idf58 (jobid));
errmsg_str = "<could not fetch error message>";
}
raise_job_exception (p, jobid, "dws", errmsg_str);
raise_job_exception (p, jobid, PLUGIN_NAME, errmsg_str);
json_decref (constraints);
return;
} else if (flux_jobtap_job_aux_set (p,
Expand All @@ -644,7 +661,7 @@ static void resource_update_msg_cb (flux_t *h,
flux_log_error (h,
"could not update jobspec for %s with new constraints and resources",
idf58 (jobid));
raise_job_exception (p, jobid, "dws", "Internal error: failed to update jobspec");
raise_job_exception (p, jobid, PLUGIN_NAME, "Internal error: failed to update jobspec");
json_decref (constraints);
return;
}
Expand Down Expand Up @@ -728,7 +745,7 @@ static const struct flux_plugin_handler tab[] = {

int flux_plugin_init (flux_plugin_t *p)
{
if (flux_plugin_register (p, "dws", tab) < 0
if (flux_plugin_register (p, PLUGIN_NAME, tab) < 0
|| flux_jobtap_service_register (p, "resource-update", resource_update_msg_cb, p) < 0
|| flux_jobtap_service_register (p, "prolog-remove", prolog_remove_msg_cb, p) < 0
|| flux_jobtap_service_register (p, "epilog-remove", epilog_remove_msg_cb, p) < 0) {
Expand Down

0 comments on commit 385475c

Please sign in to comment.