Skip to content

Commit

Permalink
resource: add support for post-partial cancel processing
Browse files Browse the repository at this point in the history
  • Loading branch information
milroy committed Oct 17, 2024
1 parent 8623b01 commit dcf93cd
Show file tree
Hide file tree
Showing 12 changed files with 93 additions and 35 deletions.
15 changes: 12 additions & 3 deletions resource/modules/resource_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1908,6 +1908,11 @@ static int run_remove (std::shared_ptr<resource_ctx_t> &ctx,
{
int rc = -1;
dfu_traverser_t &tr = *(ctx->traverser);
std::shared_ptr<job_info_t> info = nullptr;
bool pcanceled = false;

if (is_existent_jobid (ctx, jobid))
info = ctx->jobs[jobid];

if (part_cancel) {
// RV1exec only reader supported in production currently
Expand All @@ -1921,21 +1926,25 @@ static int run_remove (std::shared_ptr<resource_ctx_t> &ctx,
static_cast<intmax_t> (jobid));
goto out;
}
if (info)
info->set_pcanceled ();
rc = tr.remove (R, reader, jobid, full_removal);
} else {
rc = tr.remove (jobid);
if (info)
pcanceled = info->get_pcancel_state ();

rc = tr.remove (jobid, pcanceled);
full_removal = true;
}
if (rc != 0) {
if (is_existent_jobid (ctx, jobid)) {
if (info) {

Check warning on line 1940 in resource/modules/resource_match.cpp

View check run for this annotation

Codecov / codecov/patch

resource/modules/resource_match.cpp#L1940

Added line #L1940 was not covered by tests
// When this condition arises, we will be less likely
// to be able to reuse this jobid. Having the errored job
// in the jobs map will prevent us from reusing the jobid
// up front. Note that a same jobid can be reserved and
// removed multiple times by the upper queuing layer
// as part of providing advanced queueing policies
// (e.g., conservative backfill).
std::shared_ptr<job_info_t> info = ctx->jobs[jobid];
info->state = job_lifecycle_t::ERROR;
}
flux_log (ctx->h,
Expand Down
4 changes: 3 additions & 1 deletion resource/planner/c/planner_multi.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,15 @@ int64_t planner_multi_add_span (planner_multi_t *ctx,
* \param ctx opaque multi-planner context returned
* from planner_multi_new.
* \param span_id span_id returned from planner_multi_add_span.
* \param post_pcancel bool indicating whether preceded by one or more partial
* cancels
* \return 0 on success; -1 on error with errno set as follows:
* EINVAL: invalid argument.
* EKEYREJECTED: span could not be removed from
* the planner's internal data structures.
* ERANGE: a resource state became out of a valid range.
*/
int planner_multi_rem_span (planner_multi_t *ctx, int64_t span_id);
int planner_multi_rem_span (planner_multi_t *ctx, int64_t span_id, bool post_pcancel);

/*! Reduce the existing span's resources from the planner.
* This function will be called for a partial release/cancel.
Expand Down
10 changes: 7 additions & 3 deletions resource/planner/c/planner_multi_c_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ extern "C" int64_t planner_multi_add_span (planner_multi_t *ctx,
return mspan;
}

extern "C" int planner_multi_rem_span (planner_multi_t *ctx, int64_t span_id)
extern "C" int planner_multi_rem_span (planner_multi_t *ctx, int64_t span_id, bool post_pcancel)
{
size_t i;
int rc = -1;
Expand All @@ -423,8 +423,12 @@ extern "C" int planner_multi_rem_span (planner_multi_t *ctx, int64_t span_id)
goto done;
}
for (i = 0; i < it->second.size (); ++i) {
if (planner_rem_span (ctx->plan_multi->get_planner_at (i), it->second[i]) == -1)
goto done;
if (planner_rem_span (ctx->plan_multi->get_planner_at (i), it->second[i]) == -1) {
// If executed after partial cancel, depending on pruning filter settings
// some spans may no longer exist. In that case there is no error.
if (!post_pcancel)
goto done;

Check warning on line 430 in resource/planner/c/planner_multi_c_interface.cpp

View check run for this annotation

Codecov / codecov/patch

resource/planner/c/planner_multi_c_interface.cpp#L429-L430

Added lines #L429 - L430 were not covered by tests
}
}
ctx->plan_multi->get_span_lookup ().erase (it);
rc = 0;
Expand Down
4 changes: 2 additions & 2 deletions resource/planner/test/planner_test02.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ static int test_multi_add_remove ()
span3 = planner_multi_add_span (ctx, 2000, 1000, request3, len);
ok ((span3 != -1), "span added for (%s)", ss.str ().c_str ());

rc = planner_multi_rem_span (ctx, span2);
rc = planner_multi_rem_span (ctx, span2, false);
ok (!rc, "multi_rem_span works");

size = planner_multi_span_size (ctx);
Expand Down Expand Up @@ -582,7 +582,7 @@ static int test_constructors_and_overload ()
bo = (bo || !(planner_multis_equal (ctx, ctx2)));
ok (!bo, "test copy constructor doesn't mutate planner");
// Compare planners after mutation
rc = planner_multi_rem_span (ctx2, span);
rc = planner_multi_rem_span (ctx2, span, false);
size = planner_multi_span_size (ctx2);
ok ((size == 2), "planner_multi_span_size works after copy");

Expand Down
3 changes: 2 additions & 1 deletion resource/readers/resource_reader_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
namespace Flux {
namespace resource_model {

enum class job_modify_t { CANCEL, PARTIAL_CANCEL, VTX_CANCEL };
enum class job_modify_t { CANCEL, POST_PCANCEL, PARTIAL_CANCEL, VTX_CANCEL };

struct modify_data_t {
job_modify_t mod_type = job_modify_t::PARTIAL_CANCEL;
std::unordered_set<int64_t> ranks_removed;
std::unordered_map<const char *, int64_t> type_to_count;
std::unordered_set<vtx_t> brokerless_res;
};

/*! Base resource reader class.
Expand Down
2 changes: 1 addition & 1 deletion resource/readers/resource_reader_jgf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ int resource_reader_jgf_t::cancel_vtx (vtx_t vtx,
if (agg_span != job2span.end ()) {
if ((subtree_plan = g[vtx].idata.subplans[containment_sub]) == NULL)
goto ret;
if (planner_multi_rem_span (subtree_plan, agg_span->second) != 0)
if (planner_multi_rem_span (subtree_plan, agg_span->second, false) != 0)
goto ret;
// Delete from job2span tracker
job2span.erase (update_data.jobid);
Expand Down
27 changes: 17 additions & 10 deletions resource/reapi/bindings/c++/reapi_cli_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -668,18 +668,23 @@ void resource_query_t::set_job (const uint64_t jobid, const std::shared_ptr<job_
int resource_query_t::remove_job (const uint64_t jobid)
{
int rc = -1;
std::shared_ptr<job_info_t> info = nullptr;
bool pcanceled = false;

Check warning on line 672 in resource/reapi/bindings/c++/reapi_cli_impl.hpp

View check run for this annotation

Codecov / codecov/patch

resource/reapi/bindings/c++/reapi_cli_impl.hpp#L671-L672

Added lines #L671 - L672 were not covered by tests

if (jobid > (uint64_t)std::numeric_limits<int64_t>::max ()) {
errno = EOVERFLOW;
return rc;
}

rc = traverser->remove (static_cast<int64_t> (jobid));
if (jobs.find (jobid) != jobs.end ()) {
info = jobs[jobid];
pcanceled = info->get_pcancel_state ();

Check warning on line 681 in resource/reapi/bindings/c++/reapi_cli_impl.hpp

View check run for this annotation

Codecov / codecov/patch

resource/reapi/bindings/c++/reapi_cli_impl.hpp#L679-L681

Added lines #L679 - L681 were not covered by tests
}

rc = traverser->remove (static_cast<int64_t> (jobid), pcanceled);

Check warning on line 684 in resource/reapi/bindings/c++/reapi_cli_impl.hpp

View check run for this annotation

Codecov / codecov/patch

resource/reapi/bindings/c++/reapi_cli_impl.hpp#L684

Added line #L684 was not covered by tests
if (rc == 0) {
if (jobs.find (jobid) != jobs.end ()) {
std::shared_ptr<job_info_t> info = jobs[jobid];
if (info)

Check warning on line 686 in resource/reapi/bindings/c++/reapi_cli_impl.hpp

View check run for this annotation

Codecov / codecov/patch

resource/reapi/bindings/c++/reapi_cli_impl.hpp#L686

Added line #L686 was not covered by tests
info->state = job_lifecycle_t::CANCELED;
}
} else {
m_err_msg += traverser->err_message ();
traverser->clear_err_message ();
Expand All @@ -691,6 +696,7 @@ int resource_query_t::remove_job (const uint64_t jobid, const std::string &R, bo
{
int rc = -1;
std::shared_ptr<resource_reader_base_t> reader;
std::shared_ptr<job_info_t> info = nullptr;

Check warning on line 699 in resource/reapi/bindings/c++/reapi_cli_impl.hpp

View check run for this annotation

Codecov / codecov/patch

resource/reapi/bindings/c++/reapi_cli_impl.hpp#L699

Added line #L699 was not covered by tests

if (jobid > (uint64_t)std::numeric_limits<int64_t>::max ()) {
errno = EOVERFLOW;
Expand All @@ -706,14 +712,15 @@ int resource_query_t::remove_job (const uint64_t jobid, const std::string &R, bo
return rc;
}

if (jobs.find (jobid) != jobs.end ()) {
info = jobs[jobid];
info->set_pcanceled ();

Check warning on line 717 in resource/reapi/bindings/c++/reapi_cli_impl.hpp

View check run for this annotation

Codecov / codecov/patch

resource/reapi/bindings/c++/reapi_cli_impl.hpp#L715-L717

Added lines #L715 - L717 were not covered by tests
}

rc = traverser->remove (R, reader, static_cast<int64_t> (jobid), full_removal);
if (rc == 0) {
if (full_removal) {
auto job_info_it = jobs.find (jobid);
if (job_info_it != jobs.end ()) {
job_info_it->second->state = job_lifecycle_t::CANCELED;
}
}
if (full_removal && info)
info->state = job_lifecycle_t::CANCELED;

Check warning on line 723 in resource/reapi/bindings/c++/reapi_cli_impl.hpp

View check run for this annotation

Codecov / codecov/patch

resource/reapi/bindings/c++/reapi_cli_impl.hpp#L722-L723

Added lines #L722 - L723 were not covered by tests
} else {
m_err_msg += traverser->err_message ();
traverser->clear_err_message ();
Expand Down
4 changes: 2 additions & 2 deletions resource/traversers/dfu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ int dfu_traverser_t::find (std::shared_ptr<match_writers_t> &writers, const std:
return detail::dfu_impl_t::find (writers, criteria);
}

int dfu_traverser_t::remove (int64_t jobid)
int dfu_traverser_t::remove (int64_t jobid, bool pcanceled)
{
subsystem_t dom = get_match_cb ()->dom_subsystem ();
if (!get_graph () || !get_graph_db ()
Expand All @@ -449,7 +449,7 @@ int dfu_traverser_t::remove (int64_t jobid)
}

vtx_t root = get_graph_db ()->metadata.roots.at (dom);
return detail::dfu_impl_t::remove (root, jobid);
return detail::dfu_impl_t::remove (root, jobid, pcanceled);
}

int dfu_traverser_t::remove (const std::string &R_to_cancel,
Expand Down
4 changes: 3 additions & 1 deletion resource/traversers/dfu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,12 @@ class dfu_traverser_t : protected detail::dfu_impl_t {
* the resource state.
*
* \param jobid job id.
* \param pcanceled bool indicating whether preceded by one or more partial
* cancels
* \return 0 on success; -1 on error.
* EINVAL: graph, roots or match callback not set.
*/
int remove (int64_t jobid);
int remove (int64_t jobid, bool pcanceled);

/*! Remove the allocation/reservation referred to by jobid and update
* the resource state.
Expand Down
4 changes: 3 additions & 1 deletion resource/traversers/dfu_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,11 @@ class dfu_impl_t {
*
* \param root root resource vertex.
* \param jobid job id.
* \param pcanceled bool indicating whether preceded by one or more partial
* cancels
* \return 0 on success; -1 on error.
*/
int remove (vtx_t root, int64_t jobid);
int remove (vtx_t root, int64_t jobid, bool pcanceled);

/*! Remove the allocation/reservation referred to by jobid and update
* the resource state.
Expand Down
29 changes: 26 additions & 3 deletions resource/traversers/dfu_impl_update.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,11 @@ int dfu_impl_t::mod_agfilter (vtx_t u,
goto done;
}
if (mod_data.mod_type != job_modify_t::PARTIAL_CANCEL) {
if ((rc = planner_multi_rem_span (subtree_plan, span_it->second)) != 0) {
bool post_pcancel = false;
if (mod_data.mod_type == job_modify_t::POST_PCANCEL)
post_pcancel = true;

if ((rc = planner_multi_rem_span (subtree_plan, span_it->second, post_pcancel)) != 0) {
m_err_msg += __FUNCTION__;
m_err_msg += ": planner_multi_rem_span returned -1.\n";
m_err_msg += (*m_graph)[u].name + ".\n";
Expand Down Expand Up @@ -536,6 +540,9 @@ int dfu_impl_t::mod_plan (vtx_t u, int64_t jobid, modify_data_t &mod_data)
if (mod_data.mod_type != job_modify_t::PARTIAL_CANCEL) {
(*m_graph)[u].schedule.allocations.erase (alloc_span);
} else {
// Need to convert to c_str () because interner is finalized
if (std::string ((*m_graph)[u].type.c_str ()) == "ssd")
mod_data.brokerless_res.insert (u);

Check warning on line 545 in resource/traversers/dfu_impl_update.cpp

View check run for this annotation

Codecov / codecov/patch

resource/traversers/dfu_impl_update.cpp#L545

Added line #L545 was not covered by tests
goto done;
}
} else if ((res_span = (*m_graph)[u].schedule.reservations.find (jobid))
Expand Down Expand Up @@ -771,12 +778,16 @@ int dfu_impl_t::update (vtx_t root,
return (rc > 0) ? 0 : -1;
}

int dfu_impl_t::remove (vtx_t root, int64_t jobid)
int dfu_impl_t::remove (vtx_t root, int64_t jobid, bool pcanceled)
{
bool root_has_jtag =
((*m_graph)[root].idata.tags.find (jobid) != (*m_graph)[root].idata.tags.end ());
modify_data_t mod_data;
mod_data.mod_type = job_modify_t::CANCEL;

if (pcanceled)
mod_data.mod_type = job_modify_t::POST_PCANCEL;
else
mod_data.mod_type = job_modify_t::CANCEL;
m_color.reset ();
return (root_has_jtag) ? mod_dfv (root, jobid, mod_data) : mod_exv (jobid, mod_data);
}
Expand Down Expand Up @@ -829,6 +840,18 @@ int dfu_impl_t::remove (vtx_t root,
m_color.reset ();
if (root_has_jtag) {
rc = mod_dfv (root, jobid, mod_data);

// Need to re-run VTX_CANCEL in case brokerless resources (e.g., SSDs)
// were found in the previous VTX_CANCEL
if (mod_data.brokerless_res.size () > 0) {
mod_data.mod_type = job_modify_t::VTX_CANCEL;
for (const vtx_t &vtx : mod_data.brokerless_res) {
if ((rc = cancel_vertex (vtx, mod_data, jobid)) != 0) {
errno = EINVAL;
return rc;

Check warning on line 851 in resource/traversers/dfu_impl_update.cpp

View check run for this annotation

Codecov / codecov/patch

resource/traversers/dfu_impl_update.cpp#L847-L851

Added lines #L847 - L851 were not covered by tests
}
}
}
// Was the root vertex's job tag removed? If so, full_cancel
full_cancel =
((*m_graph)[root].idata.tags.find (jobid) == (*m_graph)[root].idata.tags.end ());
Expand Down
22 changes: 15 additions & 7 deletions resource/utilities/command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,16 @@ command_t commands[] =
static int do_remove (std::shared_ptr<resource_context_t> &ctx, int64_t jobid)
{
int rc = -1;
if ((rc = ctx->traverser->remove ((int64_t)jobid)) == 0) {
if (ctx->jobs.find (jobid) != ctx->jobs.end ()) {
std::shared_ptr<job_info_t> info = ctx->jobs[jobid];
bool pcanceled = false;
std::shared_ptr<job_info_t> info = nullptr;

if (ctx->jobs.find (jobid) != ctx->jobs.end ()) {
info = ctx->jobs[jobid];
pcanceled = info->get_pcancel_state ();
}
if ((rc = ctx->traverser->remove ((int64_t)jobid, pcanceled)) == 0) {
if (info)
info->state = job_lifecycle_t::CANCELED;
}
} else {
std::cout << ctx->traverser->err_message ();
ctx->traverser->clear_err_message ();
Expand All @@ -127,12 +132,15 @@ static int do_partial_remove (std::shared_ptr<resource_context_t> &ctx,
bool &full_cancel)
{
int rc = -1;
std::shared_ptr<job_info_t> info = nullptr;

if (ctx->jobs.find (jobid) != ctx->jobs.end ()) {
info = ctx->jobs[jobid];
info->set_pcanceled ();
}
if ((rc = ctx->traverser->remove (R_cancel, reader, (int64_t)jobid, full_cancel)) == 0) {
if (full_cancel && (ctx->jobs.find (jobid) != ctx->jobs.end ())) {
std::shared_ptr<job_info_t> info = ctx->jobs[jobid];
if (full_cancel && info)
info->state = job_lifecycle_t::CANCELED;
}
} else {
std::cout << ctx->traverser->err_message ();
ctx->traverser->clear_err_message ();
Expand Down

0 comments on commit dcf93cd

Please sign in to comment.