Skip to content

Commit

Permalink
task group prototyping
Browse files Browse the repository at this point in the history
  • Loading branch information
colinthomas-z80 committed Apr 26, 2024
1 parent 6423918 commit f4de28d
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 2 deletions.
3 changes: 2 additions & 1 deletion taskvine/src/manager/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ SOURCES = \
vine_current_transfers.c \
vine_file_replica_table.c \
vine_fair.c \
vine_runtime_dir.c
vine_runtime_dir.c \
vine_task_groups.c

PUBLIC_HEADERS = taskvine.h

Expand Down
1 change: 1 addition & 0 deletions taskvine/src/manager/taskvine.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ typedef enum {
VINE_SCHEDULE_UNSET = 0, /**< Internal use only. */
VINE_SCHEDULE_FCFS, /**< Select worker on a first-come-first-serve basis. */
VINE_SCHEDULE_FILES, /**< Select worker that has the most data required by the task. (default) */
VINE_SCHEDULE_GROUPS, /**< Select a worker running a task in the task group */
VINE_SCHEDULE_TIME, /**< Select worker that has the fastest execution time on previous tasks. */
VINE_SCHEDULE_RAND, /**< Select a random worker. */
VINE_SCHEDULE_WORST /**< Select the worst fit worker (the worker with more unused resources). */
Expand Down
51 changes: 50 additions & 1 deletion taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ See the file COPYING for details.
#include "vine_runtime_dir.h"
#include "vine_schedule.h"
#include "vine_task.h"
#include "vine_task_groups.h"
#include "vine_task_info.h"
#include "vine_taskgraph_log.h"
#include "vine_txn_log.h"
Expand Down Expand Up @@ -2708,6 +2709,41 @@ static vine_result_code_t start_one_task(struct vine_manager *q, struct vine_wor
return result;
}

/*
Start one task on a given worker by specializing the task to the worker,
sending the appropriate input files, and then sending the details of the task.
Note that the "infile" and "outfile" components of the task refer to
files that have already been uploaded into the worker's cache by the manager.
*/

static vine_result_code_t start_group_task(
struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t, struct list *l)
{
vine_result_code_t result = 0;
struct vine_task *lt;
LIST_ITERATE(l, lt)
{
struct rmsummary *limits = vine_manager_choose_resources_for_task(q, w, t);
char *command_line;

if (q->monitor_mode && !t->needs_library) {
command_line = vine_monitor_wrap(q, w, t, limits);
} else {
command_line = xxstrdup(t->command_line);
}
result = vine_manager_put_task(q, w, lt, command_line, limits, 0);
free(command_line);
if (result == VINE_SUCCESS) {
t->current_resource_box = limits;
rmsummary_merge_override_basic(t->resources_allocated, limits);
debug(D_VINE, "%s (%s) busy on group '%s'", w->hostname, w->addrport, t->command_line);
} else {
rmsummary_delete(limits);
}
}
return result;
}

static void count_worker_resources(struct vine_manager *q, struct vine_worker_info *w)
{
w->resources->cores.inuse = 0;
Expand Down Expand Up @@ -2815,7 +2851,16 @@ static vine_result_code_t commit_task_to_worker(struct vine_manager *q, struct v
t->addrport = xxstrdup(w->addrport);

t->time_when_commit_start = timestamp_get();
vine_result_code_t result = start_one_task(q, w, t);
vine_result_code_t result;
struct list *l = 0;
if (t->group_id) {
l = hash_table_lookup(q->task_group_table, t->group_id);
}
if (l && list_size(l) > 1) {
result = start_group_task(q, w, t, l);
} else {
result = start_one_task(q, w, t);
}
t->time_when_commit_end = timestamp_get();

itable_insert(w->current_tasks, t->task_id, t);
Expand Down Expand Up @@ -3778,6 +3823,7 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert

q->factory_table = hash_table_create(0, 0);
q->current_transfer_table = hash_table_create(0, 0);
q->task_group_table = hash_table_create(0, 0);
q->fetch_factory = 0;

q->measured_local_resources = rmsummary_create(-1);
Expand Down Expand Up @@ -4522,6 +4568,9 @@ int vine_submit(struct vine_manager *q, struct vine_task *t)
/* Ensure category structure is created. */
vine_category_lookup_or_create(q, t->category);

/* Attemp to group this task based on temp dependencies. */
vine_task_groups_assign_task(q, t);

change_task_state(q, t, VINE_TASK_READY);

t->time_when_submitted = timestamp_get();
Expand Down
1 change: 1 addition & 0 deletions taskvine/src/manager/vine_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ struct vine_manager {
struct hash_table *factory_table; /* Maps factory_name -> vine_factory_info */
struct hash_table *workers_with_available_results; /* Maps link -> vine_worker_info */
struct hash_table *current_transfer_table; /* Maps uuid -> struct transfer_pair */
struct hash_table *task_group_table; /* Maps uuid -> list vine_task */

/* Primary data structures for tracking files. */

Expand Down
57 changes: 57 additions & 0 deletions taskvine/src/manager/vine_schedule.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,61 @@ static struct vine_worker_info *find_worker_by_files(struct vine_manager *q, str
return best_worker;
}

/*
Find the worker that has the largest quantity of cached data needed
by this task, so as to minimize transfer work that must be done
by the manager.
*/

static struct vine_worker_info *find_worker_by_task_groups(struct vine_manager *q, struct vine_task *t)
{
char *key;
struct vine_worker_info *w;
struct vine_worker_info *best_worker = 0;
int offset_bookkeep;
int64_t most_task_cached_bytes = 0;
int64_t task_cached_bytes;
uint8_t has_all_files;
struct vine_file_replica *replica;
struct vine_mount *m;

int ramp_down = vine_schedule_in_ramp_down(q);

HASH_TABLE_ITERATE_RANDOM_START(q->worker_table, offset_bookkeep, key, w)
{
/* Careful: If check_worker_against task fails, then w may no longer be valid. */
if (check_worker_against_task(q, w, t)) {
task_cached_bytes = 0;
has_all_files = 1;

LIST_ITERATE(t->input_mounts, m)
{
replica = hash_table_lookup(w->current_files, m->file->cached_name);

if (replica && m->file->type == VINE_FILE) {
task_cached_bytes += replica->size;
} else if (m->file->cache_level > VINE_CACHE_LEVEL_TASK) {
has_all_files = 0;
}
}

/* Return the worker if it was in possession of all cacheable files */
if (has_all_files && !ramp_down) {
return w;
}

if (!best_worker || task_cached_bytes > most_task_cached_bytes ||
(ramp_down && task_cached_bytes == most_task_cached_bytes &&
candidate_has_worse_fit(best_worker, w))) {
best_worker = w;
most_task_cached_bytes = task_cached_bytes;
}
}
}

return best_worker;
}

/*
Find the first available worker in first-come, first-served order.
Since the order of workers in the hashtable is somewhat arbitrary,
Expand Down Expand Up @@ -444,6 +499,8 @@ struct vine_worker_info *vine_schedule_task_to_worker(struct vine_manager *q, st
return find_worker_by_worst_fit(q, t);
case VINE_SCHEDULE_FCFS:
return find_worker_by_fcfs(q, t);
case VINE_SCHEDULE_GROUPS:
return find_worker_by_task_groups(q, t);
case VINE_SCHEDULE_RAND:
default:
return find_worker_by_random(q, t);
Expand Down
7 changes: 7 additions & 0 deletions taskvine/src/manager/vine_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ struct vine_task *vine_task_create(const char *command_line)

t->refcount = 1;

t->group_id = 0;

return t;
}

Expand Down Expand Up @@ -244,6 +246,11 @@ struct vine_task *vine_task_copy(const struct vine_task *task)
new->resources_requested = rmsummary_copy(task->resources_requested, 0);
}

/* Group ID is copied. */
if (task->group_id) {
new->group_id = strdup(task->group_id);
}

return new;
}

Expand Down
2 changes: 2 additions & 0 deletions taskvine/src/manager/vine_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ End user may only use the API described in taskvine.h

#include "list.h"
#include "category.h"
#include "uuid.h"

#include <stdint.h>

Expand Down Expand Up @@ -118,6 +119,7 @@ struct vine_task {

int has_fixed_locations; /**< Whether at least one file was added with the VINE_FIXED_LOCATION flag. Task fails immediately if no
worker can satisfy all the strict inputs of the task. */
char *group_id;

int refcount; /**< Number of remaining references to this object. */
};
Expand Down
86 changes: 86 additions & 0 deletions taskvine/src/manager/vine_task_groups.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
Copyright (C) 2022- The University of Notre Dame
This software is distributed under the GNU General Public License.
See the file COPYING for details.
*/

#include "vine_task_groups.h"
#include "debug.h"
#include "vine_mount.h"
#include "vine_task.h"

// create a new task group for this task based on the temp mount file
static int vine_task_groups_create_group(struct vine_manager *q, struct vine_task *t, struct vine_mount *m)
{
cctools_uuid_t uuid;
cctools_uuid_create(&uuid);
char *id = strdup(uuid.str);
struct list *l = list_create();

t->group_id = id;

struct vine_task *tc = vine_task_copy(t);

list_push_head(l, tc);
hash_table_insert(q->task_group_table, id, l);
return 1;
}

// locate the group with the task which outputs the desired file, and add the new task
static int vine_task_groups_add_to_group(struct vine_manager *q, struct vine_task *t, struct vine_mount *m)
{
struct list *l;
char *id;
HASH_TABLE_ITERATE(q->task_group_table, id, l)
{
struct vine_file *f;
LIST_ITERATE(l, f)
{
if (f == m->file) {
struct vine_task *tc = vine_task_copy(t);
list_push_tail(l, tc);
return 1;
}
}
}
return 0;
}

/*
When a task comes in through vine_submit, look for temp files in its inputs/outputs
If there is a temp file on the input there is already a task group it should be assigned to.
If there is only a temp output it would be the first of a new group.
*/
int vine_task_groups_assign_task(struct vine_manager *q, struct vine_task *t)
{
struct vine_mount *input_mount;
struct vine_mount *output_mount;

int inputs_present = 0;
int outputs_present = 0;

LIST_ITERATE(t->input_mounts, input_mount)
{
if (input_mount->file->type == VINE_TEMP) {
inputs_present++;
break;
}
}

LIST_ITERATE(t->output_mounts, output_mount)
{
if (output_mount->file->type == VINE_TEMP) {
outputs_present++;
break;
}
}

// could also be inputs_present && outputs_present
if (inputs_present) {
vine_task_groups_add_to_group(q, t, input_mount);
} else if (outputs_present) {
vine_task_groups_create_group(q, t, output_mount);
}

return inputs_present || outputs_present;
}
12 changes: 12 additions & 0 deletions taskvine/src/manager/vine_task_groups.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
Copyright (C) 2022- The University of Notre Dame
This software is distributed under the GNU General Public License.
See the file COPYING for details.
*/

#include "taskvine.h"
#include "vine_manager.h"
#include "uuid.h"


int vine_task_groups_assign_task(struct vine_manager *q, struct vine_task *t);

0 comments on commit f4de28d

Please sign in to comment.