Skip to content

Commit

Permalink
vine: measure and report worker sandbox used (cooperative-computing-l…
Browse files Browse the repository at this point in the history
…ab#3889)

* measure sandbox on completion and report to manageR

* fix disk check bug

* remove task inputs from sandbox measurement

* remove debug

* update measurement strategy

* format

* refer to sandbox usage for category minimum

* format

* move max_disk_use to vine_stats->max_sandbox

* use hash_table to encode excluded paths

* minimum disk requested is always at least input size

* do not add one to sandbox_used

* pad with at least bucket_size/2 as we do for first allocations

* update wq

* treat sandbox as any other disk allocation

* add VINE_RESULT_SANDBOX_EXHAUSTION

* use padded sandbox as min

* treat sandboxes differently than regular allocations

otherwise FIXED allocations that did not specify disk are not retried.

* format

* add API call to enable/disable proportional resources

* api

* consider min resources for proportion

* use user values

* disable whole task proportion if specific resource was given

* rename max_sandbox -> min_sandbox

* do not double measure disk

* set minimum disk usage from sandbox

* update comments

* disk only specified output and ephemeral files

* proportion with available disk - cached

* use already inuse_cache value in count_worker_resources

* use available_disk with t->input_files_size

* turn off prop whole tasks only for mem and disk

* check for user resources not specified

* fix conflict, input_size

* format

* macros to conver bytes to megabytes, etc.

* correctly account for inuse_cache units

* add DISK to vine_worker test

* format

---------

Co-authored-by: Benjamin Tovar <[email protected]>
  • Loading branch information
colinthomas-z80 and btovar authored Sep 30, 2024
1 parent 5f5bc96 commit 24520c9
Show file tree
Hide file tree
Showing 23 changed files with 301 additions and 77 deletions.
6 changes: 3 additions & 3 deletions doc/manuals/taskvine/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1928,8 +1928,8 @@ Consider now that the task requires 1 cores, 6GB of memory, and 27 GB of disk:

!!! note
If you want TaskVine to exactly allocate the resources you have
specified, use the `proportional-resources` and `proportional-whole-tasks`
parameters as shown [here](#tuning-specialized-execution-parameters). In
specified, use `m.disable_proportional_resources()` (see also `proportional-whole-tasks`
[here](#tuning-specialized-execution-parameters). In
general, however, we have found that using proportions nicely adapts to the
underlying available resources, and leads to very few resource exhaustion
failures while still using worker resources efficiently.
Expand Down Expand Up @@ -2535,10 +2535,10 @@ change.
| min-transfer-timeout | Set the minimum number of seconds to wait for files to be transferred to or from a worker. | 10 |
| monitor-interval | Maximum number of seconds between resource monitor measurements. If less than 1, use default. | 5 |
| prefer-dispatch | If 1, try to dispatch tasks even if there are retrieved tasks ready to be reportedas done. | 0 |
| proportional-resources | If set to 0, do not assign resources proportionally to tasks. The default is to use proportions. (See [task resources.](#task-resources) | 1 |
| proportional-whole-tasks | Round up resource proportions such that only an integer number of tasks could be fit in the worker. The default is to use proportions. (See [task resources.](#task-resources) | 1 |
| ramp-down-heuristic | If set to 1 and there are more workers than tasks waiting, then tasks are allocated all the free resources of a worker large enough to run them. If monitoring watchdog is not enabled, then this heuristic has no effect. | 0 |
| resource-submit-multiplier | Assume that workers have `resource x resources-submit-multiplier` available.<br> This overcommits resources at the worker, causing tasks to be sent to workers that cannot be immediately executed.<br>The extra tasks wait at the worker until resources become available. | 1 |
| sandbox-grow-factor | When task disk sandboxes are exhausted, increase the allocation using their measured valued times this factor. Minimum is 1.1. | 2 |
| short-timeout | Set the minimum timeout in seconds when sending a brief message to a single worker. | 5 |
| temp-replica-count | Number of temp file replicas created across workers | 0 |
| transfer-outlier-factor | Transfer that are this many times slower than the average will be terminated. | 10 |
Expand Down
5 changes: 5 additions & 0 deletions dttools/src/category.c
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,11 @@ const struct rmsummary *category_task_min_resources(struct category *c, struct r
/* but don't go below the minimum defined for the category. */
rmsummary_merge_max(internal, c->min_allocation);

/* nor below the observed sandboxes if not in an auto mode */
if (c->allocation_mode == CATEGORY_ALLOCATION_MODE_FIXED && user && user->disk < 0) {
internal->disk = MAX(internal->disk, c->min_vine_sandbox);
}

return internal;
}

Expand Down
3 changes: 3 additions & 0 deletions dttools/src/category.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ struct category {
/* stats for taskvine */
struct vine_stats *vine_stats;

/* Max sandbox disk space observed, in MB. This is the minimum sandbox size needed if nothing else is known about the task.*/
int64_t min_vine_sandbox;

/* variables for makeflow */
/* Mappings between variable names defined in the makeflow file and their values. */
struct hash_table *mf_variables;
Expand Down
7 changes: 7 additions & 0 deletions dttools/src/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ See the file COPYING for details.
#define TERABYTE TERA
#define PETABYTE PETA

#define BYTES_TO_STORAGE_UNIT(x, unit) (((double) x) / unit)
#define BYTES_TO_KILOBYTES(x) BYTES_TO_STORAGE_UNIT(x, KILOBYTE)
#define BYTES_TO_MEGABYTES(x) BYTES_TO_STORAGE_UNIT(x, MEGABYTE)
#define BYTES_TO_GIGABYTES(x) BYTES_TO_STORAGE_UNIT(x, GIGABYTE)
#define BYTES_TO_TERABYTES(x) BYTES_TO_STORAGE_UNIT(x, TERABYTE)
#define BYTES_TO_PETABYTES(x) BYTES_TO_STORAGE_UNIT(x, PETABYTE)

#define USECOND 1000000

#endif
10 changes: 7 additions & 3 deletions dttools/src/path_disk_size_info.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ struct DIR_with_name {
char *name;
};

int path_disk_size_info_get(const char *path, int64_t *measured_size, int64_t *number_of_files)
int path_disk_size_info_get(const char *path, int64_t *measured_size, int64_t *number_of_files, struct hash_table *exclude_paths)
{

struct stat info;
int result = stat(path, &info);
if (result == 0) {
if (S_ISDIR(info.st_mode)) {
struct path_disk_size_info *state = NULL;
result = path_disk_size_info_get_r(path, -1, &state);
result = path_disk_size_info_get_r(path, -1, &state, exclude_paths);

*measured_size = state->last_byte_size_complete;
*number_of_files = state->last_file_count_complete;
Expand All @@ -46,7 +46,7 @@ int path_disk_size_info_get(const char *path, int64_t *measured_size, int64_t *n
return result;
}

int path_disk_size_info_get_r(const char *path, int64_t max_secs, struct path_disk_size_info **state)
int path_disk_size_info_get_r(const char *path, int64_t max_secs, struct path_disk_size_info **state, struct hash_table *exclude_paths)
{
int64_t start_time = time(0);
int result = 0;
Expand Down Expand Up @@ -115,6 +115,10 @@ int path_disk_size_info_get_r(const char *path, int64_t max_secs, struct path_di
snprintf(composed_path, PATH_MAX, "%s/%s", tail->name, entry->d_name);
}

if (exclude_paths && hash_table_lookup(exclude_paths, composed_path)) {
continue;
}

if (lstat(composed_path, &file_info) < 0) {
if (errno == ENOENT) {
/* our DIR structure is stale, and a file went away. We simply do nothing. */
Expand Down
8 changes: 6 additions & 2 deletions dttools/src/path_disk_size_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ See the file COPYING for details.

#include "int_sizes.h"
#include "list.h"
#include "hash_table.h"

struct path_disk_size_info {
int complete_measurement;
Expand All @@ -29,9 +30,10 @@ Query disk space on the given directory.
@param path Directory to be measured.
@param *measured_size A pointer to an integer that will be filled with the total space in bytes.
@param *number_of_files A pointer to an integer that will be filled with the total number of files, directories, and symbolic links.
@param exclude_paths Hash table with strings of paths that should not be measured. Values of the hash table are ignored.
@return zero on success, -1 if an error is encounterd (see errno).
*/
int path_disk_size_info_get(const char *path, int64_t *measured_size, int64_t *number_of_files);
int path_disk_size_info_get(const char *path, int64_t *measured_size, int64_t *number_of_files, struct hash_table *exclude_paths);

/** Get a (perhaps partial) disk usage on path, but measure by max_secs at a time.
If *state is NULL, start a new measurement, otherwise continue from
Expand All @@ -40,9 +42,11 @@ When the function returns, if *state->complete_measurement is 1, then the measur
@param path Directory to be measured.
@param max_secs Maximum number of seconds to spend in the measurement.
@param *state State of the measurement.
@param exclude_paths Hash table with strings of paths that should not be measured. Values of the hash table are ignored.
@return zero on success, -1 if an error is encounterd (see errno).
*/
int path_disk_size_info_get_r(const char *path, int64_t max_secs, struct path_disk_size_info **state);
int path_disk_size_info_get_r(const char *path, int64_t max_secs, struct path_disk_size_info **state, struct hash_table *exclude_paths);


void path_disk_size_info_delete_state(struct path_disk_size_info *state);

Expand Down
4 changes: 2 additions & 2 deletions dttools/src/rmonitor_poll.c
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ int rmonitor_get_wd_usage(struct rmonitor_wdir_info *d, int max_time_for_measure
{
/* We need a pointer to a pointer, which it is not possible from a struct. Use a dummy variable. */
struct path_disk_size_info *state = d->state;
int status = path_disk_size_info_get_r(d->path, max_time_for_measurement, &state);
int status = path_disk_size_info_get_r(d->path, max_time_for_measurement, &state, NULL);

d->state = state;

Expand Down Expand Up @@ -945,7 +945,7 @@ struct rmsummary *rmonitor_measure_host(char *path)
struct rmsummary *tr = rmsummary_create(-1);

if (path) {
path_disk_size_info_get(path, &total_disk, &file_count);
path_disk_size_info_get(path, &total_disk, &file_count, NULL);
tr->disk = ((double)total_disk) / ONE_MEGABYTE;
tr->total_files = file_count;
}
Expand Down
18 changes: 17 additions & 1 deletion taskvine/src/manager/taskvine.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ typedef enum {
VINE_RESULT_FIXED_LOCATION_MISSING = 10 << 3, /**< The task failed because no worker could satisfy the fixed
location input file requirements. */
VINE_RESULT_CANCELLED = 11 << 3, /**< The task was cancelled by the caller. */
VINE_RESULT_LIBRARY_EXIT = 12 << 3 /**< Task is a library that has terminated. **/
VINE_RESULT_LIBRARY_EXIT = 12 << 3, /**< Task is a library that has terminated. **/
VINE_RESULT_SANDBOX_EXHAUSTION = 13 << 3 /**< The task used more disk than the allowed sandbox. **/
} vine_result_t;

/** Select how to allocate resources for similar tasks with @ref vine_set_category_mode */
Expand Down Expand Up @@ -1088,6 +1089,21 @@ int vine_enable_peer_transfers(struct vine_manager *m);
/** Disable taskvine peer transfers to be scheduled by the manager **/
int vine_disable_peer_transfers(struct vine_manager *m);

/** When enabled, resources to tasks in are assigned in proportion to the size
of the worker. If a resource is specified (e.g. with @ref vine_task_set_cores),
proportional resources never go below explicit specifications. This mode is most
useful when only some of the resources are explicitely specified, or
with automatic resource allocation. By default it is enabled.
@param m A manager object
**/
int vine_enable_proportional_resources(struct vine_manager *m);

/** Disable proportional resources. See @ref vine_enable_proportional_resources.
* Proportional resources are enabled by default.
@param m A manager object
**/
int vine_disable_proportional_resources(struct vine_manager *m);

/** Set the minimum task_id of future submitted tasks.
Further submitted tasks are guaranteed to have a task_id larger or equal to
minid. This function is useful to make task_ids consistent in a workflow that
Expand Down
14 changes: 14 additions & 0 deletions taskvine/src/manager/vine_file_replica_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ Copyright (C) 2022- The University of Notre Dame
See the file COPYING for details.
*/

#include <math.h>

#include "vine_file_replica_table.h"
#include "set.h"
#include "vine_blocklist.h"
Expand All @@ -25,6 +27,12 @@ int vine_file_replica_table_insert(struct vine_manager *m, struct vine_worker_in
w->inuse_cache += replica->size;
hash_table_insert(w->current_files, cachename, replica);

double prev_available = w->resources->disk.total - ceil(BYTES_TO_MEGABYTES(w->inuse_cache + replica->size));
if (prev_available >= m->current_max_worker->disk) {
/* the current worker may have been the one with the maximum available space, so we update it. */
m->current_max_worker->disk = w->resources->disk.total - ceil(BYTES_TO_MEGABYTES(w->inuse_cache));
}

struct set *workers = hash_table_lookup(m->file_worker_table, cachename);
if (!workers) {
workers = set_create(4);
Expand All @@ -44,6 +52,12 @@ struct vine_file_replica *vine_file_replica_table_remove(struct vine_manager *m,
w->inuse_cache -= replica->size;
}

double available = w->resources->disk.total - BYTES_TO_MEGABYTES(w->inuse_cache);
if (available > m->current_max_worker->disk) {
/* the current worker has more space than we knew before for all workers, so we update it. */
m->current_max_worker->disk = available;
}

struct set *workers = hash_table_lookup(m->file_worker_table, cachename);

if (workers) {
Expand Down
Loading

0 comments on commit 24520c9

Please sign in to comment.