Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vine: fixed_location task groups #3787

Merged
merged 23 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion taskvine/src/manager/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ SOURCES = \
vine_current_transfers.c \
vine_file_replica_table.c \
vine_fair.c \
vine_runtime_dir.c
vine_runtime_dir.c \
vine_task_groups.c

PUBLIC_HEADERS = taskvine.h

Expand Down
10 changes: 5 additions & 5 deletions taskvine/src/manager/taskvine.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ typedef enum {
/** Select overall scheduling algorithm for matching tasks to workers. */
typedef enum {
VINE_SCHEDULE_UNSET = 0, /**< Internal use only. */
VINE_SCHEDULE_FCFS, /**< Select worker on a first-come-first-serve basis. */
VINE_SCHEDULE_FILES, /**< Select worker that has the most data required by the task. (default) */
VINE_SCHEDULE_TIME, /**< Select worker that has the fastest execution time on previous tasks. */
VINE_SCHEDULE_RAND, /**< Select a random worker. */
VINE_SCHEDULE_WORST /**< Select the worst fit worker (the worker with more unused resources). */
VINE_SCHEDULE_FCFS, /**< Select worker on a first-come-first-serve basis. */
VINE_SCHEDULE_FILES, /**< Select worker that has the most data required by the task. (default) */
VINE_SCHEDULE_TIME, /**< Select worker that has the fastest execution time on previous tasks. */
VINE_SCHEDULE_RAND, /**< Select a random worker. */
VINE_SCHEDULE_WORST /**< Select the worst fit worker (the worker with more unused resources). */
} vine_schedule_t;

/** Possible outcomes for a task, returned by @ref vine_task_get_result.
Expand Down
84 changes: 83 additions & 1 deletion taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ See the file COPYING for details.
#include "vine_runtime_dir.h"
#include "vine_schedule.h"
#include "vine_task.h"
#include "vine_task_groups.h"
#include "vine_task_info.h"
#include "vine_taskgraph_log.h"
#include "vine_txn_log.h"
Expand Down Expand Up @@ -2990,6 +2991,43 @@ static vine_result_code_t commit_task_to_worker(struct vine_manager *q, struct v
return result;
}

static vine_result_code_t commit_task_group_to_worker(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
{
vine_result_code_t result = VINE_SUCCESS;

struct list *l = NULL;
if (t->group_id) {
debug(D_VINE, "Task %d has GroupID of %s. Should Send:", t->task_id, t->group_id);
colinthomas-z80 marked this conversation as resolved.
Show resolved Hide resolved
l = hash_table_lookup(q->task_group_table, t->group_id);

struct vine_task *logt;
LIST_ITERATE(l, logt)
{
debug(D_VINE, "Task ID: %d", logt->task_id);
}

list_remove(l, t);
// decrement refcount
vine_task_delete(t);
}

int counter = 0;
do {

if (counter && (result == VINE_SUCCESS)) {
int t_idx = priority_queue_find_idx(q->ready_tasks, t);
priority_queue_remove(q->ready_tasks, t_idx);
// decrement refcount
vine_task_delete(t);
}
result = commit_task_to_worker(q, w, t);
counter++;
} while ((l && (t = list_pop_head(l))));

debug(D_VINE, "Sent batch of %d tasks to worker %s", counter, w->hostname);
return result;
}

/* 1 if task resubmitted, 0 otherwise */
static int resubmit_task_on_exhaustion(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
{
Expand Down Expand Up @@ -3297,6 +3335,9 @@ static void vine_manager_consider_recovery_task(struct vine_manager *q, struct v
if (!rt)
return;

/* Do not try to group recovery tasks */
rt->group_id = NULL;

switch (rt->state) {
case VINE_TASK_INITIAL:
/* The recovery task has never been run, so submit it now. */
Expand Down Expand Up @@ -3438,7 +3479,26 @@ static int send_one_task(struct vine_manager *q)

if (w) {
priority_queue_remove(q->ready_tasks, t_idx);
vine_result_code_t result = commit_task_to_worker(q, w, t);

// do not continue if this worker is running a group task
if (q->task_groups_enabled) {
struct vine_task *it;
uint64_t taskid;
ITABLE_ITERATE(w->current_tasks, taskid, it)
{
if (it->group_id) {
return 0;
}
}
}

vine_result_code_t result;
if (q->task_groups_enabled) {
result = commit_task_group_to_worker(q, w, t);
} else {
result = commit_task_to_worker(q, w, t);
}

switch (result) {
case VINE_SUCCESS:
/* return on successful commit. */
Expand Down Expand Up @@ -3943,6 +4003,7 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert

q->factory_table = hash_table_create(0, 0);
q->current_transfer_table = hash_table_create(0, 0);
q->task_group_table = hash_table_create(0, 0);
q->fetch_factory = 0;

q->measured_local_resources = rmsummary_create(-1);
Expand Down Expand Up @@ -4008,6 +4069,8 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert
// peer transfers enabled by default
q->peer_transfers_enabled = 1;

q->task_groups_enabled = 0;

q->load_from_shared_fs_enabled = 0;

q->file_source_max_transfers = VINE_FILE_SOURCE_MAX_TRANSFERS;
Expand Down Expand Up @@ -4285,6 +4348,9 @@ void vine_delete(struct vine_manager *q)
vine_current_transfers_clear(q);
hash_table_delete(q->current_transfer_table);

vine_task_groups_clear(q);
hash_table_delete(q->task_group_table);

itable_clear(q->tasks, (void *)delete_task_at_exit);
itable_delete(q->tasks);

Expand Down Expand Up @@ -4654,6 +4720,11 @@ int vine_submit(struct vine_manager *q, struct vine_task *t)
vine_task_set_scheduler(t, VINE_SCHEDULE_FILES);
}

/* Attempt to group this task based on temp dependencies. */
if (q->task_groups_enabled) {
vine_task_groups_assign_task(q, t);
}

/* If the task produces temporary files, create recovery tasks for those. */
vine_manager_create_recovery_tasks(q, t);

Expand Down Expand Up @@ -5472,6 +5543,14 @@ int vine_cancel_by_task_id(struct vine_manager *q, int task_id)
return 0;
}

if (task->group_id && (task->refcount > 1)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need task->group_id = NULL? inside this if?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so since the task is getting deleted afterwards

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to have the if (task->group_id) { only, set it to NULL inside the if, and do not check for the refcount. The only place where we should be checking for the refcount is when deleting the task, otherwise it gets tricky if we ever call functions in a different context.

struct list *l = hash_table_lookup(q->task_group_table, task->group_id);
if (l) {
list_remove(l, task);
}
vine_task_delete(task);
}

reset_task_to_state(q, task, VINE_TASK_RETRIEVED);

task->result = VINE_RESULT_CANCELLED;
Expand Down Expand Up @@ -5673,6 +5752,9 @@ int vine_tune(struct vine_manager *q, const char *name, double value)
} else if (!strcmp(name, "update-interval")) {
q->update_interval = MAX(1, (int)value);

} else if (!strcmp(name, "task-groups")) {
q->task_groups_enabled = MIN(1, (int)value);

} else if (!strcmp(name, "resource-management-interval")) {
q->resource_management_interval = MAX(1, (int)value);

Expand Down
4 changes: 4 additions & 0 deletions taskvine/src/manager/vine_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ struct vine_manager {
struct hash_table *workers_with_watched_file_updates; /* Maps link -> vine_worker_info */
struct hash_table *workers_with_complete_tasks; /* Maps link -> vine_worker_info */
struct hash_table *current_transfer_table; /* Maps uuid -> struct transfer_pair */
struct hash_table *task_group_table; /* Maps uuid -> list vine_task */

/* Primary data structures for tracking files. */

Expand Down Expand Up @@ -182,6 +183,9 @@ struct vine_manager {
int tasks_waiting_last_hungry; /* Number of tasks originally waiting when call to vine_hungry_computation was made. */
timestamp_t hungry_check_interval; /* Maximum interval between vine_hungry_computation checks. */

/* Task Groups Configuration */
int task_groups_enabled;

/* Various performance knobs that can be tuned. */
int short_timeout; /* Timeout in seconds to send/recv a brief message from worker */
int long_timeout; /* Timeout if in the middle of an incomplete message. */
Expand Down
4 changes: 4 additions & 0 deletions taskvine/src/manager/vine_manager_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,10 @@ vine_result_code_t vine_manager_put_task(
}
}

if (t->group_id) {
vine_manager_send(q, w, "groupid %s\n", t->group_id);
}

// vine_manager_send returns the number of bytes sent, or a number less than
// zero to indicate errors. We are lazy here, we only check the last
// message we sent to the worker (other messages may have failed above).
Expand Down
6 changes: 6 additions & 0 deletions taskvine/src/manager/vine_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ struct vine_task *vine_task_create(const char *command_line)
t->priority = 0;

vine_counters.task.created++;
t->group_id = 0;

return t;
}
Expand Down Expand Up @@ -256,6 +257,11 @@ struct vine_task *vine_task_copy(const struct vine_task *task)
new->resources_requested = rmsummary_copy(task->resources_requested, 0);
}

/* Group ID is copied. */
if (task->group_id) {
new->group_id = strdup(task->group_id);
}

return new;
}

Expand Down
5 changes: 4 additions & 1 deletion taskvine/src/manager/vine_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ End user may only use the API described in taskvine.h

#include "list.h"
#include "category.h"
#include "uuid.h"

#include <stdint.h>

Expand Down Expand Up @@ -130,11 +131,13 @@ struct vine_task {
struct rmsummary *resources_requested; /**< Number of cores, disk, memory, time, etc. the task requires. */
struct rmsummary *current_resource_box; /**< Resources allocated to the task on this specific worker. */

double sandbox_measured; /**< On completion, the maximum size observed of the disk used by the task for output and ephemeral files. */
double sandbox_measured; /**< On completion, the maximum size observed of the disk used by the task for output and ephemeral files. */

int has_fixed_locations; /**< Whether at least one file was added with the VINE_FIXED_LOCATION flag. Task fails immediately if no
worker can satisfy all the strict inputs of the task. */

char *group_id; /**< When enabled, group ID will be assigned based on temp file dependencies of this task */

int refcount; /**< Number of remaining references to this object. */
};

Expand Down
96 changes: 96 additions & 0 deletions taskvine/src/manager/vine_task_groups.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
Copyright (C) 2022- The University of Notre Dame
This software is distributed under the GNU General Public License.
See the file COPYING for details.
*/

#include "vine_task_groups.h"
#include "debug.h"
#include "vine_mount.h"
#include "vine_task.h"
#include "stringtools.h"

// create a new task group for this task based on the temp mount file
static int vine_task_groups_create_group(struct vine_manager *q, struct vine_task *t, struct vine_mount *m)
{
static int group_id_counter = 1;
colinthomas-z80 marked this conversation as resolved.
Show resolved Hide resolved

char *id = string_format("%d", group_id_counter++);
struct list *l = list_create();

t->group_id = id;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is group_id a string? Let's keep an int if possible so there is less chance of memory leaks/segfaults.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is because the group_id is a key to a hash table. They used to be a full uuid string but that was deemed to be excessive since the number of groups does not get very large.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest using struct itable then. It is like struct hash_table but the keys are ints.


struct vine_task *tc = vine_task_addref(t);

list_push_head(l, tc);
hash_table_insert(q->task_group_table, id, l);
return 1;
}

// locate the group with the task which outputs the desired file, and add the new task
static int vine_task_groups_add_to_group(struct vine_manager *q, struct vine_task *t, struct vine_mount *m)
{
char *id = m->file->recovery_task->group_id;

if (id) {
struct list *group = hash_table_lookup(q->task_group_table, id);
t->group_id = id;
struct vine_task *tc = vine_task_addref(t);
list_push_tail(group, tc);
}

return 0;
}

/*
When a task comes in through vine_submit, look for temp files in its inputs/outputs
If there is a temp file on the input there is already a task group it should be assigned to.
If there is only a temp output it would be the first of a new group.
*/
int vine_task_groups_assign_task(struct vine_manager *q, struct vine_task *t)
{
struct vine_mount *input_mount;
struct vine_mount *output_mount;

int inputs_present = 0;
int outputs_present = 0;

LIST_ITERATE(t->input_mounts, input_mount)
{
if (input_mount->file->type == VINE_TEMP) {
inputs_present++;
break;
}
}

LIST_ITERATE(t->output_mounts, output_mount)
{
if (output_mount->file->type == VINE_TEMP) {
outputs_present++;
break;
}
}

// could also be inputs_present && outputs_present
if (inputs_present) {
vine_task_groups_add_to_group(q, t, input_mount);
debug(D_VINE, "Assigned task to group %s", t->group_id);
} else if (outputs_present) {
vine_task_groups_create_group(q, t, output_mount);
debug(D_VINE, "Create task with group %s", t->group_id);
}

return inputs_present || outputs_present;
}

static void vine_task_group_delete(struct list *l)
{
if (l) {
list_delete(l);
}
}

void vine_task_groups_clear(struct vine_manager *q)
{
hash_table_clear(q->task_group_table, (void *)vine_task_group_delete);
}
14 changes: 14 additions & 0 deletions taskvine/src/manager/vine_task_groups.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
Copyright (C) 2022- The University of Notre Dame
This software is distributed under the GNU General Public License.
See the file COPYING for details.
*/

#include "taskvine.h"
#include "vine_manager.h"
#include "uuid.h"


int vine_task_groups_assign_task(struct vine_manager *q, struct vine_task *t);

void vine_task_groups_clear(struct vine_manager *q);
Loading
Loading