diff --git a/taskvine/src/manager/Makefile b/taskvine/src/manager/Makefile index 849cb50262..a036e9bd64 100644 --- a/taskvine/src/manager/Makefile +++ b/taskvine/src/manager/Makefile @@ -27,7 +27,8 @@ SOURCES = \ vine_current_transfers.c \ vine_file_replica_table.c \ vine_fair.c \ - vine_runtime_dir.c + vine_runtime_dir.c \ + vine_task_groups.c PUBLIC_HEADERS = taskvine.h diff --git a/taskvine/src/manager/taskvine.h b/taskvine/src/manager/taskvine.h index b9f221fc66..d5d10a39d2 100644 --- a/taskvine/src/manager/taskvine.h +++ b/taskvine/src/manager/taskvine.h @@ -73,11 +73,11 @@ typedef enum { /** Select overall scheduling algorithm for matching tasks to workers. */ typedef enum { VINE_SCHEDULE_UNSET = 0, /**< Internal use only. */ - VINE_SCHEDULE_FCFS, /**< Select worker on a first-come-first-serve basis. */ - VINE_SCHEDULE_FILES, /**< Select worker that has the most data required by the task. (default) */ - VINE_SCHEDULE_TIME, /**< Select worker that has the fastest execution time on previous tasks. */ - VINE_SCHEDULE_RAND, /**< Select a random worker. */ - VINE_SCHEDULE_WORST /**< Select the worst fit worker (the worker with more unused resources). */ + VINE_SCHEDULE_FCFS, /**< Select worker on a first-come-first-serve basis. */ + VINE_SCHEDULE_FILES, /**< Select worker that has the most data required by the task. (default) */ + VINE_SCHEDULE_TIME, /**< Select worker that has the fastest execution time on previous tasks. */ + VINE_SCHEDULE_RAND, /**< Select a random worker. */ + VINE_SCHEDULE_WORST /**< Select the worst fit worker (the worker with more unused resources). */ } vine_schedule_t; /** Possible outcomes for a task, returned by @ref vine_task_get_result. diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index c9c924f2c5..c71e36a6b7 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -24,6 +24,7 @@ See the file COPYING for details. #include "vine_runtime_dir.h" #include "vine_schedule.h" #include "vine_task.h" +#include "vine_task_groups.h" #include "vine_task_info.h" #include "vine_taskgraph_log.h" #include "vine_txn_log.h" @@ -2990,6 +2991,35 @@ static vine_result_code_t commit_task_to_worker(struct vine_manager *q, struct v return result; } +static vine_result_code_t commit_task_group_to_worker(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t) +{ + vine_result_code_t result = VINE_SUCCESS; + + struct list *l = NULL; + if (t->group_id) { + l = itable_lookup(q->task_group_table, t->group_id); + list_remove(l, t); + // decrement refcount + vine_task_delete(t); + } + + int counter = 0; + do { + + if (counter && (result == VINE_SUCCESS)) { + int t_idx = priority_queue_find_idx(q->ready_tasks, t); + priority_queue_remove(q->ready_tasks, t_idx); + // decrement refcount + vine_task_delete(t); + } + result = commit_task_to_worker(q, w, t); + counter++; + } while ((l && (t = list_pop_head(l)))); + + debug(D_VINE, "Sent batch of %d tasks to worker %s", counter, w->hostname); + return result; +} + /* 1 if task resubmitted, 0 otherwise */ static int resubmit_task_on_exhaustion(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t) { @@ -3297,6 +3327,9 @@ static void vine_manager_consider_recovery_task(struct vine_manager *q, struct v if (!rt) return; + /* Do not try to group recovery tasks */ + rt->group_id = 0; + switch (rt->state) { case VINE_TASK_INITIAL: /* The recovery task has never been run, so submit it now. */ @@ -3438,7 +3471,26 @@ static int send_one_task(struct vine_manager *q) if (w) { priority_queue_remove(q->ready_tasks, t_idx); - vine_result_code_t result = commit_task_to_worker(q, w, t); + + // do not continue if this worker is running a group task + if (q->task_groups_enabled) { + struct vine_task *it; + uint64_t taskid; + ITABLE_ITERATE(w->current_tasks, taskid, it) + { + if (it->group_id) { + return 0; + } + } + } + + vine_result_code_t result; + if (q->task_groups_enabled) { + result = commit_task_group_to_worker(q, w, t); + } else { + result = commit_task_to_worker(q, w, t); + } + switch (result) { case VINE_SUCCESS: /* return on successful commit. */ @@ -3943,6 +3995,8 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert q->factory_table = hash_table_create(0, 0); q->current_transfer_table = hash_table_create(0, 0); + q->task_group_table = itable_create(0); + q->group_id_counter = 1; q->fetch_factory = 0; q->measured_local_resources = rmsummary_create(-1); @@ -4008,6 +4062,8 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert // peer transfers enabled by default q->peer_transfers_enabled = 1; + q->task_groups_enabled = 0; + q->load_from_shared_fs_enabled = 0; q->file_source_max_transfers = VINE_FILE_SOURCE_MAX_TRANSFERS; @@ -4285,6 +4341,9 @@ void vine_delete(struct vine_manager *q) vine_current_transfers_clear(q); hash_table_delete(q->current_transfer_table); + vine_task_groups_clear(q); + itable_delete(q->task_group_table); + itable_clear(q->tasks, (void *)delete_task_at_exit); itable_delete(q->tasks); @@ -4654,6 +4713,11 @@ int vine_submit(struct vine_manager *q, struct vine_task *t) vine_task_set_scheduler(t, VINE_SCHEDULE_FILES); } + /* Attempt to group this task based on temp dependencies. */ + if (q->task_groups_enabled) { + vine_task_groups_assign_task(q, t); + } + /* If the task produces temporary files, create recovery tasks for those. */ vine_manager_create_recovery_tasks(q, t); @@ -5472,6 +5536,15 @@ int vine_cancel_by_task_id(struct vine_manager *q, int task_id) return 0; } + if (task->group_id) { + struct list *l = itable_lookup(q->task_group_table, task->group_id); + if (l) { + list_remove(l, task); + } + task->group_id = 0; + vine_task_delete(task); + } + reset_task_to_state(q, task, VINE_TASK_RETRIEVED); task->result = VINE_RESULT_CANCELLED; @@ -5673,6 +5746,9 @@ int vine_tune(struct vine_manager *q, const char *name, double value) } else if (!strcmp(name, "update-interval")) { q->update_interval = MAX(1, (int)value); + } else if (!strcmp(name, "task-groups")) { + q->task_groups_enabled = MIN(1, (int)value); + } else if (!strcmp(name, "resource-management-interval")) { q->resource_management_interval = MAX(1, (int)value); diff --git a/taskvine/src/manager/vine_manager.h b/taskvine/src/manager/vine_manager.h index cf93c79d9a..4358598afa 100644 --- a/taskvine/src/manager/vine_manager.h +++ b/taskvine/src/manager/vine_manager.h @@ -118,6 +118,7 @@ struct vine_manager { struct hash_table *workers_with_watched_file_updates; /* Maps link -> vine_worker_info */ struct hash_table *workers_with_complete_tasks; /* Maps link -> vine_worker_info */ struct hash_table *current_transfer_table; /* Maps uuid -> struct transfer_pair */ + struct itable *task_group_table; /* Maps group id -> list vine_task */ /* Primary data structures for tracking files. */ @@ -182,6 +183,10 @@ struct vine_manager { int tasks_waiting_last_hungry; /* Number of tasks originally waiting when call to vine_hungry_computation was made. */ timestamp_t hungry_check_interval; /* Maximum interval between vine_hungry_computation checks. */ + /* Task Groups Configuration */ + int task_groups_enabled; + int group_id_counter; + /* Various performance knobs that can be tuned. */ int short_timeout; /* Timeout in seconds to send/recv a brief message from worker */ int long_timeout; /* Timeout if in the middle of an incomplete message. */ diff --git a/taskvine/src/manager/vine_manager_put.c b/taskvine/src/manager/vine_manager_put.c index e8e22b22bf..2724fe945f 100644 --- a/taskvine/src/manager/vine_manager_put.c +++ b/taskvine/src/manager/vine_manager_put.c @@ -561,6 +561,10 @@ vine_result_code_t vine_manager_put_task( } } + if (t->group_id) { + vine_manager_send(q, w, "groupid %d\n", t->group_id); + } + // vine_manager_send returns the number of bytes sent, or a number less than // zero to indicate errors. We are lazy here, we only check the last // message we sent to the worker (other messages may have failed above). diff --git a/taskvine/src/manager/vine_task.c b/taskvine/src/manager/vine_task.c index 147f8170d3..c0db71c030 100644 --- a/taskvine/src/manager/vine_task.c +++ b/taskvine/src/manager/vine_task.c @@ -81,6 +81,7 @@ struct vine_task *vine_task_create(const char *command_line) t->priority = 0; vine_counters.task.created++; + t->group_id = 0; return t; } @@ -256,6 +257,11 @@ struct vine_task *vine_task_copy(const struct vine_task *task) new->resources_requested = rmsummary_copy(task->resources_requested, 0); } + /* Group ID is copied. */ + if (task->group_id) { + new->group_id = task->group_id; + } + return new; } diff --git a/taskvine/src/manager/vine_task.h b/taskvine/src/manager/vine_task.h index ec6889c096..d97e0b9530 100644 --- a/taskvine/src/manager/vine_task.h +++ b/taskvine/src/manager/vine_task.h @@ -18,6 +18,7 @@ End user may only use the API described in taskvine.h #include "list.h" #include "category.h" +#include "uuid.h" #include @@ -130,11 +131,13 @@ struct vine_task { struct rmsummary *resources_requested; /**< Number of cores, disk, memory, time, etc. the task requires. */ struct rmsummary *current_resource_box; /**< Resources allocated to the task on this specific worker. */ - double sandbox_measured; /**< On completion, the maximum size observed of the disk used by the task for output and ephemeral files. */ + double sandbox_measured; /**< On completion, the maximum size observed of the disk used by the task for output and ephemeral files. */ int has_fixed_locations; /**< Whether at least one file was added with the VINE_FIXED_LOCATION flag. Task fails immediately if no worker can satisfy all the strict inputs of the task. */ + int group_id; /**< When enabled, group ID will be assigned based on temp file dependencies of this task */ + int refcount; /**< Number of remaining references to this object. */ }; diff --git a/taskvine/src/manager/vine_task_groups.c b/taskvine/src/manager/vine_task_groups.c new file mode 100644 index 0000000000..055be5c0a9 --- /dev/null +++ b/taskvine/src/manager/vine_task_groups.c @@ -0,0 +1,94 @@ +/* +Copyright (C) 2022- The University of Notre Dame +This software is distributed under the GNU General Public License. +See the file COPYING for details. +*/ + +#include "vine_task_groups.h" +#include "debug.h" +#include "vine_mount.h" +#include "vine_task.h" +#include "stringtools.h" + +// create a new task group for this task based on the temp mount file +static int vine_task_groups_create_group(struct vine_manager *q, struct vine_task *t, struct vine_mount *m) +{ + int id = q->group_id_counter++; + struct list *l = list_create(); + + t->group_id = id; + + struct vine_task *tc = vine_task_addref(t); + + list_push_head(l, tc); + itable_insert(q->task_group_table, id, l); + return 1; +} + +// locate the group with the task which outputs the desired file, and add the new task +static int vine_task_groups_add_to_group(struct vine_manager *q, struct vine_task *t, struct vine_mount *m) +{ + int id = m->file->recovery_task->group_id; + + if (id) { + struct list *group = itable_lookup(q->task_group_table, id); + t->group_id = id; + struct vine_task *tc = vine_task_addref(t); + list_push_tail(group, tc); + } + + return 0; +} + +/* +When a task comes in through vine_submit, look for temp files in its inputs/outputs +If there is a temp file on the input there is already a task group it should be assigned to. +If there is only a temp output it would be the first of a new group. +*/ +int vine_task_groups_assign_task(struct vine_manager *q, struct vine_task *t) +{ + struct vine_mount *input_mount; + struct vine_mount *output_mount; + + int inputs_present = 0; + int outputs_present = 0; + + LIST_ITERATE(t->input_mounts, input_mount) + { + if (input_mount->file->type == VINE_TEMP) { + inputs_present++; + break; + } + } + + LIST_ITERATE(t->output_mounts, output_mount) + { + if (output_mount->file->type == VINE_TEMP) { + outputs_present++; + break; + } + } + + // could also be inputs_present && outputs_present + if (inputs_present) { + vine_task_groups_add_to_group(q, t, input_mount); + debug(D_VINE, "Assigned task to group %d", t->group_id); + } else if (outputs_present) { + vine_task_groups_create_group(q, t, output_mount); + debug(D_VINE, "Create task with group %d", t->group_id); + } + + return inputs_present || outputs_present; +} + +static void vine_task_group_delete(struct list *l) +{ + if (l) { + list_delete(l); + } +} + +void vine_task_groups_clear(struct vine_manager *q) +{ + itable_clear(q->task_group_table, (void *)vine_task_group_delete); +} diff --git a/taskvine/src/manager/vine_task_groups.h b/taskvine/src/manager/vine_task_groups.h new file mode 100644 index 0000000000..9f4d6280d3 --- /dev/null +++ b/taskvine/src/manager/vine_task_groups.h @@ -0,0 +1,14 @@ +/* +Copyright (C) 2022- The University of Notre Dame +This software is distributed under the GNU General Public License. +See the file COPYING for details. +*/ + +#include "taskvine.h" +#include "vine_manager.h" +#include "uuid.h" + + +int vine_task_groups_assign_task(struct vine_manager *q, struct vine_task *t); + +void vine_task_groups_clear(struct vine_manager *q); diff --git a/taskvine/src/worker/vine_sandbox.c b/taskvine/src/worker/vine_sandbox.c index 060869d591..c5133b9a01 100644 --- a/taskvine/src/worker/vine_sandbox.c +++ b/taskvine/src/worker/vine_sandbox.c @@ -35,7 +35,7 @@ Return VINE_STATUS_PROCESSING if some are not ready. Return VINE_STATUS_FAILED if some have definitely failed. */ -vine_cache_status_t vine_sandbox_ensure(struct vine_process *p, struct vine_cache *cache, struct link *manager) +vine_cache_status_t vine_sandbox_ensure(struct vine_process *p, struct vine_cache *cache, struct link *manager, struct itable *procs_table) { int processing = 0; @@ -52,7 +52,30 @@ vine_cache_status_t vine_sandbox_ensure(struct vine_process *p, struct vine_cach break; case VINE_CACHE_STATUS_READY: break; - case VINE_CACHE_STATUS_UNKNOWN: + case VINE_CACHE_STATUS_UNKNOWN: { + struct vine_process *lp; + uint64_t task_id; + int found_file = 0; + ITABLE_ITERATE(procs_table, task_id, lp) + { + struct vine_mount *lm; + LIST_ITERATE(lp->task->output_mounts, lm) + { + if (strcmp(lm->file->cached_name, m->file->cached_name) == 0) { + found_file = 1; + break; + } + } + if (found_file) { + break; + } + } + if (found_file) { + processing++; + break; + } + } + return VINE_CACHE_STATUS_FAILED; case VINE_CACHE_STATUS_FAILED: return VINE_CACHE_STATUS_FAILED; } diff --git a/taskvine/src/worker/vine_sandbox.h b/taskvine/src/worker/vine_sandbox.h index c580e8c490..0c36dd3af5 100644 --- a/taskvine/src/worker/vine_sandbox.h +++ b/taskvine/src/worker/vine_sandbox.h @@ -13,7 +13,7 @@ See the file COPYING for details. char *vine_sandbox_full_path(struct vine_process *p, const char *sandbox_name); -vine_cache_status_t vine_sandbox_ensure( struct vine_process *p, struct vine_cache *c, struct link *manager ); +vine_cache_status_t vine_sandbox_ensure( struct vine_process *p, struct vine_cache *c, struct link *manager, struct itable *procs_table ); int vine_sandbox_stagein( struct vine_process *p, struct vine_cache *c); diff --git a/taskvine/src/worker/vine_worker.c b/taskvine/src/worker/vine_worker.c index 5b4bf7577a..61a1033b1b 100644 --- a/taskvine/src/worker/vine_worker.c +++ b/taskvine/src/worker/vine_worker.c @@ -838,7 +838,7 @@ static struct vine_task *do_task_body(struct link *manager, int task_id, time_t char taskname_encoded[VINE_LINE_MAX]; char library_name[VINE_LINE_MAX]; char category[VINE_LINE_MAX]; - int flags, length; + int flags, length, groupid; int64_t n; timestamp_t nt; @@ -891,6 +891,8 @@ static struct vine_task *do_task_body(struct link *manager, int task_id, time_t vine_task_set_disk(task, n); } else if (sscanf(line, "gpus %" PRId64, &n)) { vine_task_set_gpus(task, n); + } else if (sscanf(line, "groupid %d", &groupid)) { + task->group_id = groupid; } else if (sscanf(line, "wall_time %" PRIu64, &nt)) { vine_task_set_time_max(task, nt); } else if (sscanf(line, "end_time %" PRIu64, &nt)) { @@ -1397,8 +1399,8 @@ static int process_ready_to_run_now(struct vine_process *p, struct vine_cache *c return 0; } - vine_cache_status_t status = vine_sandbox_ensure(p, cache, manager); - if (status == VINE_CACHE_STATUS_PROCESSING) + vine_cache_status_t status = vine_sandbox_ensure(p, cache, manager, procs_table); + if (status != VINE_CACHE_STATUS_READY) return 0; return 1; @@ -1442,7 +1444,7 @@ static int process_can_run_eventually(struct vine_process *p, struct vine_cache } } - vine_cache_status_t status = vine_sandbox_ensure(p, cache, manager); + vine_cache_status_t status = vine_sandbox_ensure(p, cache, manager, procs_table); switch (status) { case VINE_CACHE_STATUS_FAILED: case VINE_CACHE_STATUS_UNKNOWN: