diff --git a/doc/manuals/taskvine/index.md b/doc/manuals/taskvine/index.md
index 2490ad873b..8904d91635 100644
--- a/doc/manuals/taskvine/index.md
+++ b/doc/manuals/taskvine/index.md
@@ -1928,8 +1928,8 @@ Consider now that the task requires 1 cores, 6GB of memory, and 27 GB of disk:
!!! note
If you want TaskVine to exactly allocate the resources you have
- specified, use the `proportional-resources` and `proportional-whole-tasks`
- parameters as shown [here](#tuning-specialized-execution-parameters). In
+ specified, use `m.disable_proportional_resources()` (see also `proportional-whole-tasks`
+ [here](#tuning-specialized-execution-parameters). In
general, however, we have found that using proportions nicely adapts to the
underlying available resources, and leads to very few resource exhaustion
failures while still using worker resources efficiently.
@@ -2535,10 +2535,10 @@ change.
| min-transfer-timeout | Set the minimum number of seconds to wait for files to be transferred to or from a worker. | 10 |
| monitor-interval | Maximum number of seconds between resource monitor measurements. If less than 1, use default. | 5 |
| prefer-dispatch | If 1, try to dispatch tasks even if there are retrieved tasks ready to be reportedas done. | 0 |
-| proportional-resources | If set to 0, do not assign resources proportionally to tasks. The default is to use proportions. (See [task resources.](#task-resources) | 1 |
| proportional-whole-tasks | Round up resource proportions such that only an integer number of tasks could be fit in the worker. The default is to use proportions. (See [task resources.](#task-resources) | 1 |
| ramp-down-heuristic | If set to 1 and there are more workers than tasks waiting, then tasks are allocated all the free resources of a worker large enough to run them. If monitoring watchdog is not enabled, then this heuristic has no effect. | 0 |
| resource-submit-multiplier | Assume that workers have `resource x resources-submit-multiplier` available.
This overcommits resources at the worker, causing tasks to be sent to workers that cannot be immediately executed.
The extra tasks wait at the worker until resources become available. | 1 |
+| sandbox-grow-factor | When task disk sandboxes are exhausted, increase the allocation using their measured valued times this factor. Minimum is 1.1. | 2 |
| short-timeout | Set the minimum timeout in seconds when sending a brief message to a single worker. | 5 |
| temp-replica-count | Number of temp file replicas created across workers | 0 |
| transfer-outlier-factor | Transfer that are this many times slower than the average will be terminated. | 10 |
diff --git a/dttools/src/category.c b/dttools/src/category.c
index 29c2c97611..570bba3beb 100644
--- a/dttools/src/category.c
+++ b/dttools/src/category.c
@@ -921,6 +921,11 @@ const struct rmsummary *category_task_min_resources(struct category *c, struct r
/* but don't go below the minimum defined for the category. */
rmsummary_merge_max(internal, c->min_allocation);
+ /* nor below the observed sandboxes if not in an auto mode */
+ if (c->allocation_mode == CATEGORY_ALLOCATION_MODE_FIXED && user && user->disk < 0) {
+ internal->disk = MAX(internal->disk, c->min_vine_sandbox);
+ }
+
return internal;
}
diff --git a/dttools/src/category.h b/dttools/src/category.h
index 0779d4fecf..91b7ed51d9 100644
--- a/dttools/src/category.h
+++ b/dttools/src/category.h
@@ -104,6 +104,9 @@ struct category {
/* stats for taskvine */
struct vine_stats *vine_stats;
+ /* Max sandbox disk space observed, in MB. This is the minimum sandbox size needed if nothing else is known about the task.*/
+ int64_t min_vine_sandbox;
+
/* variables for makeflow */
/* Mappings between variable names defined in the makeflow file and their values. */
struct hash_table *mf_variables;
diff --git a/dttools/src/macros.h b/dttools/src/macros.h
index 640c927fb8..43e617f8c3 100644
--- a/dttools/src/macros.h
+++ b/dttools/src/macros.h
@@ -39,6 +39,13 @@ See the file COPYING for details.
#define TERABYTE TERA
#define PETABYTE PETA
+#define BYTES_TO_STORAGE_UNIT(x, unit) (((double) x) / unit)
+#define BYTES_TO_KILOBYTES(x) BYTES_TO_STORAGE_UNIT(x, KILOBYTE)
+#define BYTES_TO_MEGABYTES(x) BYTES_TO_STORAGE_UNIT(x, MEGABYTE)
+#define BYTES_TO_GIGABYTES(x) BYTES_TO_STORAGE_UNIT(x, GIGABYTE)
+#define BYTES_TO_TERABYTES(x) BYTES_TO_STORAGE_UNIT(x, TERABYTE)
+#define BYTES_TO_PETABYTES(x) BYTES_TO_STORAGE_UNIT(x, PETABYTE)
+
#define USECOND 1000000
#endif
diff --git a/dttools/src/path_disk_size_info.c b/dttools/src/path_disk_size_info.c
index 55b8b401a1..79c7cce49f 100644
--- a/dttools/src/path_disk_size_info.c
+++ b/dttools/src/path_disk_size_info.c
@@ -23,7 +23,7 @@ struct DIR_with_name {
char *name;
};
-int path_disk_size_info_get(const char *path, int64_t *measured_size, int64_t *number_of_files)
+int path_disk_size_info_get(const char *path, int64_t *measured_size, int64_t *number_of_files, struct hash_table *exclude_paths)
{
struct stat info;
@@ -31,7 +31,7 @@ int path_disk_size_info_get(const char *path, int64_t *measured_size, int64_t *n
if (result == 0) {
if (S_ISDIR(info.st_mode)) {
struct path_disk_size_info *state = NULL;
- result = path_disk_size_info_get_r(path, -1, &state);
+ result = path_disk_size_info_get_r(path, -1, &state, exclude_paths);
*measured_size = state->last_byte_size_complete;
*number_of_files = state->last_file_count_complete;
@@ -46,7 +46,7 @@ int path_disk_size_info_get(const char *path, int64_t *measured_size, int64_t *n
return result;
}
-int path_disk_size_info_get_r(const char *path, int64_t max_secs, struct path_disk_size_info **state)
+int path_disk_size_info_get_r(const char *path, int64_t max_secs, struct path_disk_size_info **state, struct hash_table *exclude_paths)
{
int64_t start_time = time(0);
int result = 0;
@@ -115,6 +115,10 @@ int path_disk_size_info_get_r(const char *path, int64_t max_secs, struct path_di
snprintf(composed_path, PATH_MAX, "%s/%s", tail->name, entry->d_name);
}
+ if (exclude_paths && hash_table_lookup(exclude_paths, composed_path)) {
+ continue;
+ }
+
if (lstat(composed_path, &file_info) < 0) {
if (errno == ENOENT) {
/* our DIR structure is stale, and a file went away. We simply do nothing. */
diff --git a/dttools/src/path_disk_size_info.h b/dttools/src/path_disk_size_info.h
index f2c2f2d674..2208b962b0 100644
--- a/dttools/src/path_disk_size_info.h
+++ b/dttools/src/path_disk_size_info.h
@@ -9,6 +9,7 @@ See the file COPYING for details.
#include "int_sizes.h"
#include "list.h"
+#include "hash_table.h"
struct path_disk_size_info {
int complete_measurement;
@@ -29,9 +30,10 @@ Query disk space on the given directory.
@param path Directory to be measured.
@param *measured_size A pointer to an integer that will be filled with the total space in bytes.
@param *number_of_files A pointer to an integer that will be filled with the total number of files, directories, and symbolic links.
+@param exclude_paths Hash table with strings of paths that should not be measured. Values of the hash table are ignored.
@return zero on success, -1 if an error is encounterd (see errno).
*/
-int path_disk_size_info_get(const char *path, int64_t *measured_size, int64_t *number_of_files);
+int path_disk_size_info_get(const char *path, int64_t *measured_size, int64_t *number_of_files, struct hash_table *exclude_paths);
/** Get a (perhaps partial) disk usage on path, but measure by max_secs at a time.
If *state is NULL, start a new measurement, otherwise continue from
@@ -40,9 +42,11 @@ When the function returns, if *state->complete_measurement is 1, then the measur
@param path Directory to be measured.
@param max_secs Maximum number of seconds to spend in the measurement.
@param *state State of the measurement.
+@param exclude_paths Hash table with strings of paths that should not be measured. Values of the hash table are ignored.
@return zero on success, -1 if an error is encounterd (see errno).
*/
-int path_disk_size_info_get_r(const char *path, int64_t max_secs, struct path_disk_size_info **state);
+int path_disk_size_info_get_r(const char *path, int64_t max_secs, struct path_disk_size_info **state, struct hash_table *exclude_paths);
+
void path_disk_size_info_delete_state(struct path_disk_size_info *state);
diff --git a/dttools/src/rmonitor_poll.c b/dttools/src/rmonitor_poll.c
index 9a25a2b626..532e5fb5d2 100644
--- a/dttools/src/rmonitor_poll.c
+++ b/dttools/src/rmonitor_poll.c
@@ -780,7 +780,7 @@ int rmonitor_get_wd_usage(struct rmonitor_wdir_info *d, int max_time_for_measure
{
/* We need a pointer to a pointer, which it is not possible from a struct. Use a dummy variable. */
struct path_disk_size_info *state = d->state;
- int status = path_disk_size_info_get_r(d->path, max_time_for_measurement, &state);
+ int status = path_disk_size_info_get_r(d->path, max_time_for_measurement, &state, NULL);
d->state = state;
@@ -945,7 +945,7 @@ struct rmsummary *rmonitor_measure_host(char *path)
struct rmsummary *tr = rmsummary_create(-1);
if (path) {
- path_disk_size_info_get(path, &total_disk, &file_count);
+ path_disk_size_info_get(path, &total_disk, &file_count, NULL);
tr->disk = ((double)total_disk) / ONE_MEGABYTE;
tr->total_files = file_count;
}
diff --git a/taskvine/src/manager/taskvine.h b/taskvine/src/manager/taskvine.h
index cdcfaf6ec3..f74e2d8f9a 100644
--- a/taskvine/src/manager/taskvine.h
+++ b/taskvine/src/manager/taskvine.h
@@ -105,7 +105,8 @@ typedef enum {
VINE_RESULT_FIXED_LOCATION_MISSING = 10 << 3, /**< The task failed because no worker could satisfy the fixed
location input file requirements. */
VINE_RESULT_CANCELLED = 11 << 3, /**< The task was cancelled by the caller. */
- VINE_RESULT_LIBRARY_EXIT = 12 << 3 /**< Task is a library that has terminated. **/
+ VINE_RESULT_LIBRARY_EXIT = 12 << 3, /**< Task is a library that has terminated. **/
+ VINE_RESULT_SANDBOX_EXHAUSTION = 13 << 3 /**< The task used more disk than the allowed sandbox. **/
} vine_result_t;
/** Select how to allocate resources for similar tasks with @ref vine_set_category_mode */
@@ -1088,6 +1089,21 @@ int vine_enable_peer_transfers(struct vine_manager *m);
/** Disable taskvine peer transfers to be scheduled by the manager **/
int vine_disable_peer_transfers(struct vine_manager *m);
+/** When enabled, resources to tasks in are assigned in proportion to the size
+of the worker. If a resource is specified (e.g. with @ref vine_task_set_cores),
+proportional resources never go below explicit specifications. This mode is most
+useful when only some of the resources are explicitely specified, or
+with automatic resource allocation. By default it is enabled.
+@param m A manager object
+ **/
+int vine_enable_proportional_resources(struct vine_manager *m);
+
+/** Disable proportional resources. See @ref vine_enable_proportional_resources.
+ * Proportional resources are enabled by default.
+@param m A manager object
+ **/
+int vine_disable_proportional_resources(struct vine_manager *m);
+
/** Set the minimum task_id of future submitted tasks.
Further submitted tasks are guaranteed to have a task_id larger or equal to
minid. This function is useful to make task_ids consistent in a workflow that
diff --git a/taskvine/src/manager/vine_file_replica_table.c b/taskvine/src/manager/vine_file_replica_table.c
index d577d5def7..1395658331 100644
--- a/taskvine/src/manager/vine_file_replica_table.c
+++ b/taskvine/src/manager/vine_file_replica_table.c
@@ -4,6 +4,8 @@ Copyright (C) 2022- The University of Notre Dame
See the file COPYING for details.
*/
+#include
+
#include "vine_file_replica_table.h"
#include "set.h"
#include "vine_blocklist.h"
@@ -25,6 +27,12 @@ int vine_file_replica_table_insert(struct vine_manager *m, struct vine_worker_in
w->inuse_cache += replica->size;
hash_table_insert(w->current_files, cachename, replica);
+ double prev_available = w->resources->disk.total - ceil(BYTES_TO_MEGABYTES(w->inuse_cache + replica->size));
+ if (prev_available >= m->current_max_worker->disk) {
+ /* the current worker may have been the one with the maximum available space, so we update it. */
+ m->current_max_worker->disk = w->resources->disk.total - ceil(BYTES_TO_MEGABYTES(w->inuse_cache));
+ }
+
struct set *workers = hash_table_lookup(m->file_worker_table, cachename);
if (!workers) {
workers = set_create(4);
@@ -44,6 +52,12 @@ struct vine_file_replica *vine_file_replica_table_remove(struct vine_manager *m,
w->inuse_cache -= replica->size;
}
+ double available = w->resources->disk.total - BYTES_TO_MEGABYTES(w->inuse_cache);
+ if (available > m->current_max_worker->disk) {
+ /* the current worker has more space than we knew before for all workers, so we update it. */
+ m->current_max_worker->disk = available;
+ }
+
struct set *workers = hash_table_lookup(m->file_worker_table, cachename);
if (workers) {
diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c
index 6c508354a4..2be3d275a2 100644
--- a/taskvine/src/manager/vine_manager.c
+++ b/taskvine/src/manager/vine_manager.c
@@ -500,7 +500,7 @@ static vine_result_code_t get_completion_result(struct vine_manager *q, struct v
struct vine_task *t;
int task_status, exit_status;
uint64_t task_id;
- int64_t output_length, bytes_sent;
+ int64_t output_length, bytes_sent, sandbox_used;
timestamp_t execution_time, start_time, end_time;
timestamp_t observed_execution_time;
@@ -508,13 +508,14 @@ static vine_result_code_t get_completion_result(struct vine_manager *q, struct v
// Format: task completion status, exit status (exit code or signal), output length, bytes_sent, execution time,
// task_id
int n = sscanf(line,
- "complete %d %d %" SCNd64 " %" SCNd64 " %" SCNd64 " %" SCNd64 " %" SCNd64 "",
+ "complete %d %d %" SCNd64 " %" SCNd64 " %" SCNd64 " %" SCNd64 " %" SCNd64 " %" SCNd64 "",
&task_status,
&exit_status,
&output_length,
&bytes_sent,
&start_time,
&end_time,
+ &sandbox_used,
&task_id);
if (n < 7) {
@@ -591,6 +592,14 @@ static vine_result_code_t get_completion_result(struct vine_manager *q, struct v
}
}
+ t->sandbox_measured = sandbox_used;
+
+ /* Update category disk info */
+ struct category *c = vine_category_lookup_or_create(q, t->category);
+ if (sandbox_used > c->min_vine_sandbox) {
+ c->min_vine_sandbox = sandbox_used;
+ }
+
hash_table_insert(q->workers_with_complete_tasks, w->hashkey, w);
}
@@ -1293,6 +1302,9 @@ static int fetch_outputs_from_worker(struct vine_manager *q, struct vine_worker_
resource_monitor_compress_logs(q, t);
}
+ // fill in measured disk as it comes from a different info source.
+ t->resources_measured->disk = MAX(t->resources_measured->disk, t->sandbox_measured);
+
// Finish receiving output.
t->time_when_done = timestamp_get();
@@ -2507,21 +2519,20 @@ static int build_poll_table(struct vine_manager *q)
return n;
}
-/*
- * Use declared dependencies to estimate the minimum disk requriement of a task
- */
-static void vine_manager_estimate_task_disk_min(struct vine_manager *q, struct vine_task *t)
+static void vine_manager_compute_input_size(struct vine_manager *q, struct vine_task *t)
{
- int mb = 0;
+ t->input_files_size = -1;
+
+ int64_t input_size = 0;
struct vine_mount *m;
LIST_ITERATE(t->input_mounts, m)
{
- mb += (m->file->size) / 1e6;
- }
- if (mb > 0) {
- struct category *c = category_lookup_or_create(q->categories, vine_task_get_category(t));
- c->min_allocation->disk = MAX(mb, c->min_allocation->disk);
+ if (m->file->state == VINE_FILE_STATE_CREATED) {
+ input_size += m->file->size;
+ }
}
+
+ t->input_files_size = (int64_t)ceil(((double)input_size) / ONE_MEGABYTE);
}
/*
@@ -2547,50 +2558,80 @@ struct rmsummary *vine_manager_choose_resources_for_task(struct vine_manager *q,
return limits;
}
- if (t->resources_requested->disk < 0) {
- vine_manager_estimate_task_disk_min(q, t);
+ if (t->input_files_size < 0) {
+ vine_manager_compute_input_size(q, t);
}
/* Compute the minimum and maximum resources for this task. */
const struct rmsummary *min = vine_manager_task_resources_min(q, t);
const struct rmsummary *max = vine_manager_task_resources_max(q, t);
+ /* available disk for all sandboxes */
+ int64_t available_disk = w->resources->disk.total - BYTES_TO_MEGABYTES(w->inuse_cache);
+
+ /* do not count the size of input files as available.
+ * TODO: efficiently discount the size of files already at worker. */
+ if (t->input_files_size > 0) {
+ available_disk -= t->input_files_size;
+ }
+
rmsummary_merge_override_basic(limits, max);
int use_whole_worker = 1;
+ int proportional_whole_tasks = q->proportional_whole_tasks;
+ if (t->resources_requested->memory > -1 || t->resources_requested->disk > -1) {
+ /* if mem or disk are specified explicitely, do not expand resources to fill an integer number of tasks. With this,
+ * the task is assigned exactly the memory and disk specified. We do not do this for cores and gpus, as the use case
+ * here is to specify the number of cores and allocated the rest of the resources evenly. */
+ proportional_whole_tasks = 0;
+ }
+
/* Proportionally assign the worker's resources to the task if configured. */
if (q->proportional_resources) {
/* Compute the proportion of the worker the task shall have across resource types. */
double max_proportion = -1;
+ double min_proportion = -1;
+
if (w->resources->cores.total > 0) {
max_proportion = MAX(max_proportion, limits->cores / w->resources->cores.total);
+ min_proportion = MAX(min_proportion, min->cores / w->resources->cores.total);
}
if (w->resources->memory.total > 0) {
max_proportion = MAX(max_proportion, limits->memory / w->resources->memory.total);
+ min_proportion = MAX(min_proportion, min->memory / w->resources->memory.total);
}
- if (w->resources->disk.total > 0) {
- max_proportion = MAX(max_proportion, limits->disk / w->resources->disk.total);
+ if (available_disk > 0) {
+ max_proportion = MAX(max_proportion, limits->disk / available_disk);
+ min_proportion = MAX(min_proportion, min->disk / available_disk);
}
if (w->resources->gpus.total > 0) {
max_proportion = MAX(max_proportion, limits->gpus / w->resources->gpus.total);
+ min_proportion = MAX(min_proportion, min->gpus / w->resources->gpus.total);
}
- /* If max_proportion > 1, then the task does not fit the worker for the
+ /* If a max_proportion was defined, it cannot be less than a proportion using the minimum
+ * resources for the category. If it was defined, then the min_proportion is not relevant as the
+ * task will try to use the whole worker. */
+ if (max_proportion != -1) {
+ max_proportion = MAX(max_proportion, min_proportion);
+ }
+
+ /* If max_proportion or min_proportion > 1, then the task does not fit the worker for the
* specified resources. For the unspecified resources we use the whole
* worker as not to trigger a warning when checking for tasks that can't
* run on any available worker. */
- if (max_proportion > 1) {
+ if (max_proportion > 1 || min_proportion > 1) {
use_whole_worker = 1;
} else if (max_proportion > 0) {
use_whole_worker = 0;
// adjust max_proportion so that an integer number of tasks fit the worker.
- if (q->proportional_whole_tasks) {
+ if (proportional_whole_tasks) {
max_proportion = 1.0 / (floor(1.0 / max_proportion));
}
@@ -2610,21 +2651,21 @@ struct rmsummary *vine_manager_choose_resources_for_task(struct vine_manager *q,
limits->memory = MAX(1, MAX(limits->memory, floor(w->resources->memory.total * max_proportion)));
- /* worker's disk is shared even among tasks that are not running,
+ /* worker's disk is shared evenly among tasks that are not running,
* thus the proportion is modified by the current overcommit
* multiplier */
- limits->disk = MAX(1, MAX(limits->disk, floor((w->resources->disk.total - w->resources->disk.inuse) * max_proportion / q->resource_submit_multiplier)));
+ limits->disk = MAX(1, MAX(limits->disk, floor(available_disk * max_proportion / q->resource_submit_multiplier)));
}
}
- /* If no resource was specified, using whole worker. */
+ /* If no resource was specified, use whole worker. */
if (limits->cores < 1 && limits->gpus < 1 && limits->memory < 1 && limits->disk < 1) {
use_whole_worker = 1;
}
/* At least one specified resource would use the whole worker, thus
* using whole worker for all unspecified resources. */
if ((limits->cores > 0 && limits->cores >= w->resources->cores.total) || (limits->gpus > 0 && limits->gpus >= w->resources->gpus.total) ||
- (limits->memory > 0 && limits->memory >= w->resources->memory.total) || (limits->disk > 0 && limits->disk >= w->resources->disk.total)) {
+ (limits->memory > 0 && limits->memory >= w->resources->memory.total) || (limits->disk > 0 && limits->disk >= available_disk)) {
use_whole_worker = 1;
}
@@ -2645,7 +2686,7 @@ struct rmsummary *vine_manager_choose_resources_for_task(struct vine_manager *q,
}
if (limits->disk <= 0) {
- limits->disk = w->resources->disk.total;
+ limits->disk = available_disk;
}
} else if (vine_schedule_in_ramp_down(q)) {
/* if in ramp down, use all the free space of that worker. note that we don't use
@@ -2658,21 +2699,14 @@ struct rmsummary *vine_manager_choose_resources_for_task(struct vine_manager *q,
}
limits->memory = w->resources->memory.total - w->resources->memory.inuse;
- limits->disk = w->resources->disk.total - w->resources->disk.inuse;
+ limits->disk = available_disk;
}
/* never go below specified min resources. */
rmsummary_merge_max(limits, min);
- /* If the user specified resources, override proportional calculation */
- if (t->resources_requested->cores > 0)
- limits->cores = t->resources_requested->cores;
- if (t->resources_requested->memory > 0)
- limits->memory = t->resources_requested->memory;
- if (t->resources_requested->disk > 0)
- limits->disk = t->resources_requested->disk;
- if (t->resources_requested->gpus > 0)
- limits->gpus = t->resources_requested->gpus;
+ /* assume the user knows what they are doing... */
+ rmsummary_merge_override_basic(limits, t->resources_requested);
return limits;
}
@@ -2751,12 +2785,7 @@ static void count_worker_resources(struct vine_manager *q, struct vine_worker_in
w->resources->gpus.inuse += box->gpus;
}
- char *cachename;
- struct vine_file_replica *replica;
- HASH_TABLE_ITERATE(w->current_files, cachename, replica)
- {
- w->resources->disk.inuse += ((double)replica->size) / 1e6;
- }
+ w->resources->disk.inuse += ceil(BYTES_TO_MEGABYTES(w->inuse_cache));
}
static void update_max_worker(struct vine_manager *q, struct vine_worker_info *w)
@@ -2776,8 +2805,8 @@ static void update_max_worker(struct vine_manager *q, struct vine_worker_info *w
q->current_max_worker->memory = w->resources->memory.total;
}
- if (q->current_max_worker->disk < w->resources->disk.total) {
- q->current_max_worker->disk = w->resources->disk.total;
+ if (q->current_max_worker->disk < (w->resources->disk.total - w->inuse_cache)) {
+ q->current_max_worker->disk = w->resources->disk.total - w->inuse_cache;
}
if (q->current_max_worker->gpus < w->resources->gpus.total) {
@@ -2917,6 +2946,37 @@ static int resubmit_task_on_exhaustion(struct vine_manager *q, struct vine_worke
return 0;
}
+/* 1 if task resubmitted, 0 otherwise */
+static int resubmit_task_on_sandbox_exhaustion(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
+{
+ if (t->result != VINE_RESULT_SANDBOX_EXHAUSTION) {
+ return 0;
+ }
+
+ struct category *c = vine_category_lookup_or_create(q, t->category);
+
+ /* on sandbox exhausted, the resources allocated correspond to the overflown sandbox */
+ double sandbox = t->resources_allocated->disk;
+
+ /* grow sandbox by given factor (default is two) */
+ sandbox *= q->sandbox_grow_factor * sandbox;
+
+ /* take the MAX in case min_vine_sandbox was updated before th result of this task was processed */
+ c->min_vine_sandbox = MAX(c->min_vine_sandbox, sandbox);
+
+ debug(D_VINE, "Task %d exhausted disk sandbox on %s (%s).\n", t->task_id, w->hostname, w->addrport);
+ double max_allowed_disk = MAX(t->resources_requested->disk, c->max_allocation->disk);
+
+ if (max_allowed_disk > -1 && c->min_vine_sandbox < max_allowed_disk) {
+ debug(D_VINE, "Task %d failed given max disk limit for sandbox.\n", t->task_id);
+ return 0;
+ }
+
+ change_task_state(q, t, VINE_TASK_READY);
+
+ return 1;
+}
+
static int resubmit_if_needed(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
{
/* in this function, any change_task_state should only be to VINE_TASK_READY */
@@ -2942,6 +3002,9 @@ static int resubmit_if_needed(struct vine_manager *q, struct vine_worker_info *w
case VINE_RESULT_RESOURCE_EXHAUSTION:
return resubmit_task_on_exhaustion(q, w, t);
break;
+ case VINE_RESULT_SANDBOX_EXHAUSTION:
+ return resubmit_task_on_sandbox_exhaustion(q, w, t);
+ break;
default:
/* by default tasks are not resumitted */
return 0;
@@ -3771,6 +3834,8 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert
q->current_max_worker = rmsummary_create(-1);
q->max_task_resources_requested = rmsummary_create(-1);
+ q->sandbox_grow_factor = 2.0;
+
q->stats = calloc(1, sizeof(struct vine_stats));
q->stats_measure = calloc(1, sizeof(struct vine_stats));
@@ -3931,6 +3996,22 @@ int vine_disable_peer_transfers(struct vine_manager *q)
return 1;
}
+int vine_enable_proportional_resources(struct vine_manager *q)
+{
+ debug(D_VINE, "Proportional resources enabled");
+ q->proportional_resources = 1;
+ q->proportional_whole_tasks = 1;
+ return 1;
+}
+
+int vine_disable_proportional_resources(struct vine_manager *q)
+{
+ debug(D_VINE, "Proportional resources disabled");
+ q->proportional_resources = 0;
+ q->proportional_whole_tasks = 0;
+ return 1;
+}
+
int vine_enable_disconnect_slow_workers_category(struct vine_manager *q, const char *category, double multiplier)
{
struct category *c = vine_category_lookup_or_create(q, category);
@@ -4232,6 +4313,9 @@ char *vine_monitor_wrap(struct vine_manager *q, struct vine_worker_info *w, stru
buffer_printf(&b, " --interval %d", q->monitor_interval);
}
+ /* disable disk as it is measured throught the sandbox, otherwise we end up measuring twice. */
+ buffer_printf(&b, " --without-disk-footprint");
+
int extra_files = (q->monitor_mode & VINE_MON_FULL);
char *monitor_cmd = resource_monitor_write_command("./" RESOURCE_MONITOR_REMOTE_NAME,
@@ -4422,6 +4506,9 @@ const char *vine_result_string(vine_result_t result)
case VINE_RESULT_LIBRARY_EXIT:
str = "LIBRARY_EXIT";
break;
+ case VINE_RESULT_SANDBOX_EXHAUSTION:
+ str = "SANDBOX_EXHAUSTION";
+ break;
}
return str;
@@ -5075,6 +5162,7 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout,
timestamp_t current_time = timestamp_get();
if (current_time - q->time_last_large_tasks_check >= q->large_task_check_interval) {
q->time_last_large_tasks_check = current_time;
+ find_max_worker(q);
vine_schedule_check_for_large_tasks(q);
}
@@ -5380,7 +5468,11 @@ int vine_tune(struct vine_manager *q, const char *name, double value)
q->prefer_dispatch = !!((int)value);
} else if (!strcmp(name, "force-proportional-resources") || !strcmp(name, "proportional-resources")) {
- q->proportional_resources = MAX(0, (int)value);
+ if (value > 0) {
+ vine_enable_proportional_resources(q);
+ } else {
+ vine_disable_proportional_resources(q);
+ }
} else if (!strcmp(name, "force-proportional-resources-whole-tasks") || !strcmp(name, "proportional-whole-tasks")) {
q->proportional_whole_tasks = MAX(0, (int)value);
@@ -5448,8 +5540,13 @@ int vine_tune(struct vine_manager *q, const char *name, double value)
} else if (!strcmp(name, "option-blocklist-slow-workers-timeout")) {
q->option_blocklist_slow_workers_timeout = MAX(0, value); /*todo: confirm 0 or 1*/
+
} else if (!strcmp(name, "watch-library-logfiles")) {
q->watch_library_logfiles = !!((int)value);
+
+ } else if (!strcmp(name, "sandbox-grow-factor")) {
+ q->sandbox_grow_factor = MAX(1.1, value);
+
} else {
debug(D_NOTICE | D_VINE, "Warning: tuning parameter \"%s\" not recognized\n", name);
return -1;
@@ -5733,6 +5830,7 @@ void vine_accumulate_task(struct vine_manager *q, struct vine_task *t)
case VINE_RESULT_RESOURCE_EXHAUSTION:
case VINE_RESULT_MAX_WALL_TIME:
case VINE_RESULT_OUTPUT_TRANSFER_ERROR:
+ case VINE_RESULT_SANDBOX_EXHAUSTION:
if (category_accumulate_summary(c, t->resources_measured, q->current_max_worker)) {
vine_txn_log_write_category(q, c);
}
@@ -5752,11 +5850,15 @@ void vine_accumulate_task(struct vine_manager *q, struct vine_task *t)
break;
case VINE_RESULT_INPUT_MISSING:
case VINE_RESULT_OUTPUT_MISSING:
+ case VINE_RESULT_FIXED_LOCATION_MISSING:
+ case VINE_RESULT_CANCELLED:
+ case VINE_RESULT_RMONITOR_ERROR:
+ case VINE_RESULT_STDOUT_MISSING:
case VINE_RESULT_MAX_END_TIME:
case VINE_RESULT_UNKNOWN:
case VINE_RESULT_FORSAKEN:
case VINE_RESULT_MAX_RETRIES:
- default:
+ case VINE_RESULT_LIBRARY_EXIT:
break;
}
}
@@ -5839,7 +5941,6 @@ int vine_enable_category_resource(struct vine_manager *q, const char *category,
const struct rmsummary *vine_manager_task_resources_max(struct vine_manager *q, struct vine_task *t)
{
-
struct category *c = vine_category_lookup_or_create(q, t->category);
return category_task_max_resources(c, t->resources_requested, t->resource_request, t->task_id);
diff --git a/taskvine/src/manager/vine_manager.h b/taskvine/src/manager/vine_manager.h
index 0ecc2d7bc7..8f753769f8 100644
--- a/taskvine/src/manager/vine_manager.h
+++ b/taskvine/src/manager/vine_manager.h
@@ -218,6 +218,8 @@ struct vine_manager {
int max_library_retries; /* The maximum time that a library can be failed and retry another one, if over this count the library template will be removed */
int watch_library_logfiles; /* If true, watch the output files produced by each of the library processes running on the remote workers, take them back the current logging directory */
+ double sandbox_grow_factor; /* When task disk sandboxes are exhausted, increase the allocation using their measured valued times this factor */
+
/*todo: confirm datatype. int or int64*/
int max_task_stdout_storage; /* Maximum size of standard output from task. (If larger, send to a separate file.) */
int max_new_workers; /* Maximum number of workers to add in a single cycle before dealing with other matters. */
diff --git a/taskvine/src/manager/vine_protocol.h b/taskvine/src/manager/vine_protocol.h
index bda88e43b4..7a3dca9fcd 100644
--- a/taskvine/src/manager/vine_protocol.h
+++ b/taskvine/src/manager/vine_protocol.h
@@ -13,7 +13,7 @@ worker, and catalog, but should not be visible to the public user API.
#ifndef VINE_PROTOCOL_H
#define VINE_PROTOCOL_H
-#define VINE_PROTOCOL_VERSION 10
+#define VINE_PROTOCOL_VERSION 11
#define VINE_LINE_MAX 4096 /**< Maximum length of a vine message line. */
diff --git a/taskvine/src/manager/vine_schedule.c b/taskvine/src/manager/vine_schedule.c
index 4dc3fccc16..1127e1e636 100644
--- a/taskvine/src/manager/vine_schedule.c
+++ b/taskvine/src/manager/vine_schedule.c
@@ -14,6 +14,7 @@ See the file COPYING for details.
#include "debug.h"
#include "hash_table.h"
#include "list.h"
+#include "macros.h"
#include "rmonitor_types.h"
#include "rmsummary.h"
@@ -132,6 +133,31 @@ int check_worker_have_enough_resources(struct vine_manager *q, struct vine_worke
return ok;
}
+/* t->disk only specifies the size of output and ephemeral files. Here we check if the task would fit together with all its input files
+ * taking into account that some files may be already at the worker. */
+int check_worker_have_enough_disk_with_inputs(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
+{
+ int ok = 1;
+ double available = w->resources->disk.total - MAX(0, t->resources_requested->disk) - w->resources->disk.inuse;
+
+ struct vine_mount *m;
+ LIST_ITERATE(t->input_mounts, m)
+ {
+ if (hash_table_lookup(w->current_files, m->file->cached_name)) {
+ continue;
+ }
+
+ available -= m->file->size;
+
+ if (available < 0) {
+ ok = 1;
+ break;
+ }
+ }
+
+ return ok;
+}
+
/* Check if this task is compatible with this given worker by considering
* resources availability, features, blocklist, and all other relevant factors.
* Used by all scheduling methods for basic compatibility.
@@ -147,7 +173,7 @@ int check_worker_against_task(struct vine_manager *q, struct vine_worker_info *w
/* Otherwise library templates are modified during the run. */
/* worker has not reported any resources yet */
- if (w->resources->tag < 0 || w->resources->workers.total < 1) {
+ if (w->resources->tag < 0 || w->resources->workers.total < 1 || w->end_time < 0) {
return 0;
}
@@ -155,6 +181,10 @@ int check_worker_against_task(struct vine_manager *q, struct vine_worker_info *w
if (w->draining) {
return 0;
}
+ // if worker's end time has not been received
+ if (w->end_time < 0) {
+ return 0;
+ }
/* Don't send tasks if a task recently failed at this worker. */
if (w->last_failure_time + q->transient_error_interval > timestamp_get()) {
@@ -182,11 +212,6 @@ int check_worker_against_task(struct vine_manager *q, struct vine_worker_info *w
}
rmsummary_delete(l);
- // if worker's end time has not been received
- if (w->end_time < 0) {
- return 0;
- }
-
// if wall time for worker is specified and there's not enough time for task, then not ok
if (w->end_time > 0) {
double current_time = ((double)timestamp_get()) / ONE_SECOND;
@@ -198,6 +223,10 @@ int check_worker_against_task(struct vine_manager *q, struct vine_worker_info *w
}
}
+ if (!check_worker_have_enough_disk_with_inputs(q, w, t)) {
+ return 0;
+ }
+
/* If the worker is not the one the task wants. */
if (t->has_fixed_locations && !check_fixed_location_worker(q, w, t)) {
return 0;
diff --git a/taskvine/src/manager/vine_task.c b/taskvine/src/manager/vine_task.c
index 0813b6c1bd..a2771ab1ee 100644
--- a/taskvine/src/manager/vine_task.c
+++ b/taskvine/src/manager/vine_task.c
@@ -72,6 +72,7 @@ struct vine_task *vine_task_create(const char *command_line)
t->resources_measured = rmsummary_create(-1);
t->resources_allocated = rmsummary_create(-1);
t->current_resource_box = 0;
+ t->input_files_size = -1;
t->refcount = 1;
t->output_received = 0;
diff --git a/taskvine/src/manager/vine_task.h b/taskvine/src/manager/vine_task.h
index bd882e13ef..c8a40217c4 100644
--- a/taskvine/src/manager/vine_task.h
+++ b/taskvine/src/manager/vine_task.h
@@ -64,6 +64,7 @@ struct vine_task {
int max_retries; /**< Number of times the task is tried to be executed on some workers until success. If less than one, the task is retried indefinitely. See try_count below.*/
int max_forsaken; /**< Number of times the task is submitted to workers without being executed. If less than one, the task is retried indefinitely. See forsaken_count below.*/
int64_t min_running_time; /**< Minimum time (in seconds) the task needs to run. (see vine_worker --wall-time)*/
+ int64_t input_files_size; /**< Size (in bytes) of input files. < 0 if the size of at least one of the input files is unknown. */
/***** Internal state of task as it works towards completion. *****/
@@ -121,6 +122,8 @@ struct vine_task {
struct rmsummary *resources_measured; /**< When monitoring is enabled, it points to the measured resources used by the task in its latest attempt. */
struct rmsummary *resources_requested; /**< Number of cores, disk, memory, time, etc. the task requires. */
struct rmsummary *current_resource_box; /**< Resources allocated to the task on this specific worker. */
+
+ double sandbox_measured; /**< On completion, the maximum size observed of the disk used by the task for output and ephemeral files. */
int has_fixed_locations; /**< Whether at least one file was added with the VINE_FIXED_LOCATION flag. Task fails immediately if no
worker can satisfy all the strict inputs of the task. */
diff --git a/taskvine/src/worker/vine_cache_file.c b/taskvine/src/worker/vine_cache_file.c
index dddf9357a0..3e364b3319 100644
--- a/taskvine/src/worker/vine_cache_file.c
+++ b/taskvine/src/worker/vine_cache_file.c
@@ -119,7 +119,7 @@ int vine_cache_file_measure_metadata(const char *path, int *mode, int64_t *size,
return 0;
/* Measure the size of the item recursively, if a directory. */
- result = path_disk_size_info_get(path, size, &nfiles);
+ result = path_disk_size_info_get(path, size, &nfiles, NULL);
if (result < 0)
return 0;
diff --git a/taskvine/src/worker/vine_process.c b/taskvine/src/worker/vine_process.c
index af56468eac..619f4cfa6a 100644
--- a/taskvine/src/worker/vine_process.c
+++ b/taskvine/src/worker/vine_process.c
@@ -13,6 +13,7 @@ See the file COPYING for details.
#include "vine_file.h"
#include "vine_mount.h"
#include "vine_worker.h"
+#include "vine_cache.h"
#include "change_process_title.h"
#include "create_dir.h"
@@ -48,6 +49,8 @@ See the file COPYING for details.
#include
#include
+extern struct vine_cache *cache_manager;
+
/*
Give the letter code used for the process sandbox dir.
*/
@@ -578,13 +581,15 @@ void vine_process_compute_disk_needed(struct vine_process *p)
}
}
+static int vine_process_sandbox_disk_measure(struct vine_process *p, int max_time_on_measurement, struct path_disk_size_info **state);
+
int vine_process_measure_disk(struct vine_process *p, int max_time_on_measurement)
{
/* we can't have pointers to struct members, thus we create temp variables here */
struct path_disk_size_info *state = p->disk_measurement_state;
- int result = path_disk_size_info_get_r(p->sandbox, max_time_on_measurement, &state);
+ int result = vine_process_sandbox_disk_measure(p, max_time_on_measurement, &state);
/* not a memory leak... Either disk_measurement_state was NULL or the same as state. */
p->disk_measurement_state = state;
@@ -600,4 +605,31 @@ int vine_process_measure_disk(struct vine_process *p, int max_time_on_measuremen
return result;
}
+static int vine_process_sandbox_disk_measure(struct vine_process *p, int max_secs, struct path_disk_size_info **state)
+{
+ int num_inputs = 0;
+ if (p->task->input_mounts) {
+ num_inputs = list_size(p->task->input_mounts);
+ }
+
+ struct hash_table *exclude_paths = NULL;
+ if (num_inputs > 0) {
+ exclude_paths = hash_table_create(2 * num_inputs, 0);
+
+ struct vine_mount *m;
+ LIST_ITERATE(p->task->input_mounts, m)
+ {
+ hash_table_insert(exclude_paths, m->remote_name, (void *)1);
+ }
+ }
+
+ int result = path_disk_size_info_get_r(p->sandbox, max_secs, state, exclude_paths);
+
+ if (exclude_paths) {
+ hash_table_delete(exclude_paths);
+ }
+
+ return result;
+}
+
/* vim: set noexpandtab tabstop=4: */
diff --git a/taskvine/src/worker/vine_worker.c b/taskvine/src/worker/vine_worker.c
index bf1f2f2e07..1c0975fbc4 100644
--- a/taskvine/src/worker/vine_worker.c
+++ b/taskvine/src/worker/vine_worker.c
@@ -286,25 +286,27 @@ void send_complete_tasks(struct link *l)
output[p->output_length] = '\0';
close(output_file);
send_async_message(l,
- "complete %d %d %lld %lld %llu %llu %d\n%s",
+ "complete %d %d %lld %lld %llu %llu %d %d\n%s",
p->result,
p->exit_code,
(long long)p->output_length,
(long long)p->output_length,
(unsigned long long)p->execution_start,
(unsigned long long)p->execution_end,
+ p->sandbox_size,
p->task->task_id,
output);
free(output);
} else {
send_async_message(l,
- "complete %d %d %lld %lld %llu %llu %d\n",
+ "complete %d %d %lld %lld %llu %llu %d %d\n",
p->result,
p->exit_code,
(long long)p->output_length,
0,
(unsigned long long)p->execution_start,
(unsigned long long)p->execution_end,
+ p->sandbox_size,
p->task->task_id);
}
}
@@ -350,7 +352,7 @@ static int64_t measure_worker_disk()
return 0;
char *cache_dir = vine_cache_data_path(cache_manager, ".");
- path_disk_size_info_get_r(cache_dir, options->max_time_on_measurement, &state);
+ path_disk_size_info_get_r(cache_dir, options->max_time_on_measurement, &state, NULL);
free(cache_dir);
int64_t disk_measured = 0;
@@ -1100,7 +1102,7 @@ static void kill_all_tasks()
/* Check whether a given process is still within the various limits imposed on it. */
-static int enforce_process_limits(struct vine_process *p)
+static int enforce_process_sanbox_limits(struct vine_process *p)
{
/* If the task did not set disk usage, return right away. */
if (p->task->resources_requested->disk < 1)
@@ -1121,7 +1123,7 @@ static int enforce_process_limits(struct vine_process *p)
/* Check all processes to see whether they have exceeded various limits, and kill if necessary. */
-static int enforce_processes_limits()
+static int enforce_processes_sandbox_limits()
{
static time_t last_check_time = 0;
@@ -1136,8 +1138,8 @@ static int enforce_processes_limits()
ITABLE_ITERATE(procs_running, task_id, p)
{
- if (!enforce_process_limits(p)) {
- finish_running_task(p, VINE_RESULT_RESOURCE_EXHAUSTION);
+ if (!enforce_process_sanbox_limits(p)) {
+ finish_running_task(p, VINE_RESULT_SANDBOX_EXHAUSTION);
trash_file(p->sandbox);
ok = 0;
@@ -1682,7 +1684,7 @@ static void vine_worker_serve_manager(struct link *manager)
/* end a running processes if goes above its declared limits.
* Mark offending process as RESOURCE_EXHASTION. */
- enforce_processes_limits();
+ enforce_processes_sandbox_limits();
/* end running processes if worker resources are exhasusted, and marked
* them as FORSAKEN, so they can be resubmitted somewhere else. */
diff --git a/taskvine/test/TR_vine_single.sh b/taskvine/test/TR_vine_single.sh
index 1a0e1de2c5..0b0b41caf3 100755
--- a/taskvine/test/TR_vine_single.sh
+++ b/taskvine/test/TR_vine_single.sh
@@ -1,6 +1,7 @@
#!/bin/sh
CORES=1
+DISK=5000
TASKS=10
. ./vine_common.sh
diff --git a/taskvine/test/vine_allocations.py b/taskvine/test/vine_allocations.py
index 783762b9e4..b09e7a0cf3 100755
--- a/taskvine/test/vine_allocations.py
+++ b/taskvine/test/vine_allocations.py
@@ -77,7 +77,7 @@ def check_task(category, category_mode, max, min, expected):
q.tune("force-proportional-resources", 1)
check_task("only_memory", "fixed", max={"memory": worker_memory / 2}, min={}, expected={"cores": worker_cores / 2, "memory": worker_memory / 2, "disk": worker_disk / 2, "gpus": 0})
- check_task("only_memory_w_minimum", "fixed", max={"memory": worker_memory / 2}, min={"cores": 3, "gpus": 2}, expected={"cores": 3, "memory": worker_memory / 2, "disk": worker_disk / 2, "gpus": 2})
+ check_task("only_memory_w_minimum", "fixed", max={"memory": worker_memory / 2}, min={"cores": 3, "gpus": 2}, expected={"cores": 4, "memory": worker_memory, "disk": worker_disk, "gpus": 2})
check_task("only_cores", "fixed", max={"cores": worker_cores}, min={}, expected={"cores": worker_cores, "memory": worker_memory, "disk": worker_disk, "gpus": 0})
diff --git a/taskvine/test/vine_common.sh b/taskvine/test/vine_common.sh
index d42b4cb2bd..c394ecf6a2 100755
--- a/taskvine/test/vine_common.sh
+++ b/taskvine/test/vine_common.sh
@@ -26,7 +26,7 @@ EOF
port=`cat master.port`
echo "starting worker"
- ../src/worker/vine_worker -o worker.log -d all localhost $port -b 1 --timeout 20 --cores $CORES --memory 50 --single-shot
+ ../src/worker/vine_worker -o worker.log -d all localhost $port -b 1 --timeout 20 --cores ${CORES:-1} --memory ${MEMORY:-250} --disk ${DISK:-2000} --single-shot
echo "checking for output"
i=0
diff --git a/work_queue/src/work_queue_process.c b/work_queue/src/work_queue_process.c
index 763b3101c2..e611fe852a 100644
--- a/work_queue/src/work_queue_process.c
+++ b/work_queue/src/work_queue_process.c
@@ -377,7 +377,7 @@ int work_queue_process_measure_disk(struct work_queue_process *p, int max_time_o
struct path_disk_size_info *state = p->disk_measurement_state;
- int result = path_disk_size_info_get_r(p->sandbox, max_time_on_measurement, &state);
+ int result = path_disk_size_info_get_r(p->sandbox, max_time_on_measurement, &state, NULL);
/* not a memory leak... Either disk_measurement_state was NULL or the same as state. */
p->disk_measurement_state = state;
diff --git a/work_queue/src/work_queue_worker.c b/work_queue/src/work_queue_worker.c
index 7bb5eb68a3..36c6d3223f 100644
--- a/work_queue/src/work_queue_worker.c
+++ b/work_queue/src/work_queue_worker.c
@@ -288,7 +288,7 @@ static int64_t measure_worker_disk()
{
static struct path_disk_size_info *state = NULL;
- path_disk_size_info_get_r("./cache", max_time_on_measurement, &state);
+ path_disk_size_info_get_r("./cache", max_time_on_measurement, &state, NULL);
int64_t disk_measured = 0;
if(state->last_byte_size_complete >= 0) {