diff --git a/.github/workflows/h5bench-hdf5-1.10.4.yml b/.github/workflows/h5bench-hdf5-1.10.4.yml index 712eb4e9..2f7d0090 100644 --- a/.github/workflows/h5bench-hdf5-1.10.4.yml +++ b/.github/workflows/h5bench-hdf5-1.10.4.yml @@ -1,5 +1,8 @@ name: h5bench (HDF5 1.10.4) +env: + ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true + on: pull_request: @@ -217,6 +220,13 @@ jobs: cd build ./h5bench --debug --abort-on-failure ../samples/sync-macsio.json + - name: Test h5bench SYNC dlio + run: | + export LD_LIBRARY_PATH=/opt/hdf5/lib:$LD_LIBRARY_PATH + + cd build + ./h5bench --debug --abort-on-failure ../samples/sync-dlio.json + - name: Setup tmate session if: ${{ failure() }} uses: mxschmitt/action-tmate@v3 diff --git a/.github/workflows/h5bench-hdf5-1.10.7.yml b/.github/workflows/h5bench-hdf5-1.10.7.yml index 8834b81b..f0631541 100644 --- a/.github/workflows/h5bench-hdf5-1.10.7.yml +++ b/.github/workflows/h5bench-hdf5-1.10.7.yml @@ -1,5 +1,8 @@ name: h5bench (HDF5 1.10.7) +env: + ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true + on: pull_request: @@ -217,6 +220,13 @@ jobs: cd build ./h5bench --debug --abort-on-failure ../samples/sync-macsio.json + - name: Test h5bench SYNC dlio + run: | + export LD_LIBRARY_PATH=/opt/hdf5/lib:$LD_LIBRARY_PATH + + cd build + ./h5bench --debug --abort-on-failure ../samples/sync-dlio.json + - name: Setup tmate session if: ${{ failure() }} uses: mxschmitt/action-tmate@v3 diff --git a/.github/workflows/h5bench-hdf5-1.10.8.yml b/.github/workflows/h5bench-hdf5-1.10.8.yml index 3c2fc5f1..d31e6666 100644 --- a/.github/workflows/h5bench-hdf5-1.10.8.yml +++ b/.github/workflows/h5bench-hdf5-1.10.8.yml @@ -1,5 +1,8 @@ name: h5bench (HDF5 1.10.8) +env: + ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true + on: pull_request: @@ -217,13 +220,20 @@ jobs: cd build ./h5bench --debug --abort-on-failure ../samples/sync-macsio.json + - name: Test h5bench SYNC dlio + run: | + export LD_LIBRARY_PATH=/opt/hdf5/lib:$LD_LIBRARY_PATH + + cd build + ./h5bench --debug --abort-on-failure ../samples/sync-dlio.json + - name: Setup tmate session if: ${{ failure() }} uses: mxschmitt/action-tmate@v3 - name: Upload artifact if: always() - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v3 with: name: test path: build/storage/**/std* diff --git a/.github/workflows/h5bench-hdf5-1.12.0.yml b/.github/workflows/h5bench-hdf5-1.12.0.yml index d194ae41..a445af9e 100644 --- a/.github/workflows/h5bench-hdf5-1.12.0.yml +++ b/.github/workflows/h5bench-hdf5-1.12.0.yml @@ -252,6 +252,13 @@ jobs: cd build ./h5bench --debug --abort-on-failure ../samples/sync-macsio.json + - name: Test h5bench SYNC dlio + run: | + export LD_LIBRARY_PATH=/opt/hdf5/lib:$LD_LIBRARY_PATH + + cd build + ./h5bench --debug --abort-on-failure ../samples/sync-dlio.json + - name: Setup tmate session if: ${{ failure() }} uses: mxschmitt/action-tmate@v3 diff --git a/.github/workflows/h5bench-hdf5-1.14.0.yml b/.github/workflows/h5bench-hdf5-1.14.0.yml index b3be7393..361961cf 100644 --- a/.github/workflows/h5bench-hdf5-1.14.0.yml +++ b/.github/workflows/h5bench-hdf5-1.14.0.yml @@ -284,6 +284,13 @@ jobs: cd build-sync ./h5bench --debug --abort-on-failure ../samples/sync-macsio.json + - name: Test h5bench SYNC dlio + run: | + export LD_LIBRARY_PATH=/opt/hdf5/lib:$LD_LIBRARY_PATH + + cd build-sync + ./h5bench --debug --abort-on-failure ../samples/sync-dlio.json + - name: Test h5bench ASYNC write/read run: | export HDF5_DIR=/opt/hdf5 diff --git a/.github/workflows/h5bench-hdf5-1.14.1.yml b/.github/workflows/h5bench-hdf5-1.14.1.yml index 2de7aceb..c0e46e2e 100644 --- a/.github/workflows/h5bench-hdf5-1.14.1.yml +++ b/.github/workflows/h5bench-hdf5-1.14.1.yml @@ -284,6 +284,13 @@ jobs: cd build-sync ./h5bench --debug --abort-on-failure ../samples/sync-macsio.json + - name: Test h5bench SYNC dlio + run: | + export LD_LIBRARY_PATH=/opt/hdf5/lib:$LD_LIBRARY_PATH + + cd build-sync + ./h5bench --debug --abort-on-failure ../samples/sync-dlio.json + - name: Test h5bench ASYNC write/read run: | export HDF5_DIR=/opt/hdf5 diff --git a/.github/workflows/h5bench-hdf5-develop.yml b/.github/workflows/h5bench-hdf5-develop.yml index b68c92e2..a14bdf53 100644 --- a/.github/workflows/h5bench-hdf5-develop.yml +++ b/.github/workflows/h5bench-hdf5-develop.yml @@ -416,6 +416,15 @@ jobs: cd build-sync ./h5bench --debug --abort-on-failure ../samples/sync-macsio.json + - name: Test h5bench SYNC dlio + run: | + export HDF5_HOME=/opt/hdf5 + + export LD_LIBRARY_PATH=$HDF5_HOME/lib:$LD_LIBRARY_PATH + + cd build-sync + ./h5bench --debug --abort-on-failure ../samples/sync-dlio.json + - name: Test h5bench ASYNC write/read run: | export HDF5_DIR=/opt/hdf5 diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 2360447c..55b39a87 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -80,6 +80,9 @@ test-perlmutter: - echo "Test h5bench SYNC openpmd" - python3 ../../samples/update-perlmutter.py ../../samples/sync-openpmd.json - ./h5bench --debug --abort-on-failure ../../samples/sync-openpmd.json + - echo "Test h5bench SYNC dlio" + - python3 ../../samples/update-perlmutter.py ../../samples/sync-dlio.json + - ./h5bench --debug --abort-on-failure ../../samples/sync-dlio.json build-perlmutter-metrics: stage: build diff --git a/CMakeLists.txt b/CMakeLists.txt index cf22a019..b03f5f2c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -29,6 +29,7 @@ option(H5BENCH_AMREX "Enable AMReX benchmark" OFF) option(H5BENCH_OPENPMD "Enable OpenPMD benchmark" OFF) option(H5BENCH_E3SM "Enable E3SM benchmark" OFF) option(H5BENCH_MACSIO "Enable MACSio benchmark" OFF) +option(H5BENCH_DLIO "Enable DLIO benchmark" OFF) message(STATUS "h5bench baseline: ON") @@ -41,6 +42,7 @@ if(H5BENCH_ALL) set(H5BENCH_OPENPMD ON) set(H5BENCH_E3SM ON) set(H5BENCH_MACSIO ON) + set(H5BENCH_DLIO ON) endif() message(STATUS "h5bench METADATA: ${H5BENCH_METADATA}") @@ -49,6 +51,7 @@ message(STATUS "h5bench AMREX: ${H5BENCH_AMREX}") message(STATUS "h5bench OPENPMD: ${H5BENCH_OPENPMD}") message(STATUS "h5bench E3SM: ${H5BENCH_E3SM}") message(STATUS "h5bench MACSIO: ${H5BENCH_MACSIO}") +message(STATUS "h5bench DLIO: ${H5BENCH_DLIO}") # HDF5 Dependency ############################################################# # @@ -328,6 +331,21 @@ configure_file(${CMAKE_SOURCE_DIR}/src/h5bench.py ${CMAKE_BINARY_DIR}/h5bench CO configure_file(${CMAKE_SOURCE_DIR}/src/h5bench_version.py ${CMAKE_BINARY_DIR}/h5bench_version.py COPYONLY) configure_file(${CMAKE_SOURCE_DIR}/src/h5bench_configuration.py.in ${CMAKE_BINARY_DIR}/h5bench_configuration.py) +# DLIO ###################################################################### +# +# https://github.com/arcturus5340/h5bench + +if(H5BENCH_DLIO) + set(h5bench_dlio_src + dlio/h5bench_dlio.c + dlio/utils.c + dlio/stats.c + dlio/workers.c) + + add_executable(h5bench_dlio ${h5bench_dlio_src}) + target_link_libraries(h5bench_dlio h5bench_util hdf5 m MPI::MPI_C) +endif() + # Install binaries ############################################################ install( @@ -419,6 +437,14 @@ if(H5BENCH_MACSIO) ) endif() +if(H5BENCH_DLIO) + install( + TARGETS + h5bench_dlio + DESTINATION bin + ) +endif() + # Testing ##################################################################### add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/tests) diff --git a/commons/h5bench_util.h b/commons/h5bench_util.h index 5fc8adb4..d787a274 100644 --- a/commons/h5bench_util.h +++ b/commons/h5bench_util.h @@ -7,6 +7,7 @@ #ifndef COMMONS_H5BENCH_UTIL_H_ #define COMMONS_H5BENCH_UTIL_H_ +#include #define DEBUG_PRINT \ printf("%s:%d\n", __func__, __LINE__); \ diff --git a/dlio/README.md b/dlio/README.md new file mode 100644 index 00000000..1f8f8043 --- /dev/null +++ b/dlio/README.md @@ -0,0 +1,129 @@ +# DLIO benchmark + +The benchmark is designed to measure the performance of training and evaluation of deep learning models on data stored +as HDF5 files. Based on collected and analysed I/O patterns from [DLIO Benchmark](https://github.com/argonne-lcf/dlio_benchmark), +this benchmark simulates the learning process and evaluation of deep learning models that use PyTorch and Tensorflow +frameworks, while gathering valuable information about system performance. Most importantly, this extension allows users +to test AI workloads without the need to install machine learning libraries, reducing complexity and enhancing the +usability of the benchmark. Another advantage is that from our experiments, our extension ran faster than DLIO Benchmark, +which we suspect was due to the difference in the overhead introduced by the C application in our extension and the +Python application in the original benchmark. While the quicker runtime could be beneficial for faster testing, it also +suggests that the benchmark might not fully capture the complexity of real AI workloads, such as high metadata +operations introduced by the use of Python-based libraries. I/O pattern produced by this extension is based on the +implementation of [DLIO benchmark version 1.1](https://github.com/argonne-lcf/dlio_benchmark/releases/tag/v1.1). +Changes in the main DLIO Benchmark configurations after version 1.1 will not be reflected in this h5bench pattern. To +reproduce them, DLIO Benchmark behavior can be studied using various I/O analysis tools. We recommend using +[Log VFD](https://docs.hdfgroup.org/hdf5/v1_14/group___f_a_p_l.html#ga4e03be2fe83ed02b32266a6c81427beb). + + +## Configuration +As in the case with other extensions, the following parameters should be specified in the configuration section of the +json file to configure the benchmark: + +| Parameter | Description | Type | Default | +|------------------------|---------------------------------------------------------------------|--------|----------| +| generate-data | Enable generation of benchmarking data | bool | false | +| train | Enable model training simulation | bool | false | +| evaluation | Enable model evaluation simulation | bool | false | +| record-length | Record size of a single sample in bytes | int | 67108864 | +| num-files-train | The number of files used to train the model | int | 32 | +| num-files-eval | The number of files used to evaluate the model | int | 8 | +| num-samples-per-file | The number of samples in each file | int | 4 | +| data-folder | Name of the directory storing the benchmark data | string | ./data | +| file-prefix | Prefix in the name of files containing training and evaluation data | string | img | +| chunking | Enable chunking | bool | false | +| chunk-size | Chunk size | int | 1024 | +| keep-files | Does not delete data after the benchmark is finished | bool | false | +| compression | Enable compression | bool | false | +| compression-level | Compression level from 1 to 9 | int | 4 | +| batch-size | Training batch size | int | 7 | +| batch-size-eval | Evaluation batch size | int | 2 | +| shuffle | Enable samples shuffle | bool | false | +| preprocess-time | Preprocessing time after reading each sample in seconds | float | 0.0 | +| preprocess-time-stdev | Standard deviation in preprocessing time in seconds | float | 0.0 | +| epochs | The number of epochs | int | 5 | +| total-training-steps | Maximum number of steps per training per epoch | int | -1 | +| computation-time | Computation time after reading each batch in seconds | float | 0.323 | +| computation-time-stdev | Standard deviation in computation time in seconds | float | 0.0 | +| random-seed | Random seed to be used | int | 42 | +| eval-time | Evaluation time after reading each batch in seconds | float | 0.323 | +| eval-time-stdev | Standard deviation in evaluation time in seconds | float | 0.0 | +| epochs-between-evals | The number of epochs between evaluations | int | 1 | +| train-data-folder | Name of the directory containing the training data | string | train | +| valid-data-folder | Name of the directory containing the validation data | string | valid | +| records-dataset-name | Name of the dataset with records | string | records | +| labels-dataset-name | Name of the dataset with labels | string | labels | +| seed-change-epoch | Enable seed changes every epoch | bool | false | +| read-threads | The number of workers used to read the data | int | 4 | +| collective-meta | Enable collective HDF5 metadata operations | bool | false | +| collective-data | Enable collective HDF5 data operations | bool | false | +| subfiling | Enable HDF5 Subfiling Virtual File Driver | bool | false | +| output-csv-name | Name of the output csv file | string | output | +| output-ranks-data | Enable statistics output for each rank | bool | false | + +It should be noted that for each parameter there is a default value that applies if the parameter has not been specified +in the configuration file. Thus, by default the benchmark will not run because the generate-data, train and evaluation +parameters are false. A sample configuration file can be found in the `samples/` directory. + +## Understanding the output +The sample output of the benchmark is as follows: +``` +=================== Performance Results ================== +Total number of ranks: 8 +The number of read threads per rank: 0 +Total training set size: 7.000 GB +Training set size per rank: 896.000 MB +Total training emulated compute time: 3.229 s +Training metadata time: 2.808 s +Training raw read time: 30.905 s +Training average raw read rate: 145.141 MB/s +Observed training completion time: 37.432 s +Observed average training rate: 131.044 MB/s +Training average throughput: 1.871 samples/s +Training throughput standard deviation: 0.037 samples/s +Training average IO: 119.729 MB/s +Training IO standard deviation: 2.379 MB/s +Total evaluation set size: 7.000 GB +Evaluation set size per rank: 896.000 MB +Total evaluation emulated compute time: 3.206 s +Evaluation metadata time: 2.805 s +Evaluation raw read time: 31.699 s +Evaluation average raw read rate: 141.906 MB/s +Observed evaluation completion time: 38.424 s +Observed average evaluation rate: 127.595 MB/s +Evaluation average throughput avg: 1.826 samples/s +Evaluation throughput standard deviation: 0.090 samples/s +Evaluation average IO: 116.883 MB/s +Evaluation IO standard deviation: 5.735 MB/s +=========================================================== +``` +Let's take a closer look at it. First, information about the number of MPI ranks and processes per MPI rank used in the +simulation is output. Then, the same values are used to describe the training and evaluation performance, so for the +sake of reducing redundancy, let us consider only the first half of the results concerning the training process. Total +training set size is calculated as the size of all HDF5 files used for training. Accordingly, the training set size per +rank gives an idea of how much of the load is taken over by one MPI rank. Total training emulated compute time contains +information about the total time spent on compute emulation for all epochs in total, as well as training metadata time +and training raw read time, about which, however, it should be noted that they are not interleaved and measure the time +of execution of `H5Fopen`, `H5Dget_space`, `H5Screate_simple`, `H5Sclose` and `H5Dread` commands respectively. Training +average raw read rate is calculated as training set size per rank divided by training raw read time. Observed training +completion time includes all the time spent on the training process, among other things including resource allocation +and computation simulation. Observed average training rate is equal to training set size per rank divided by the +difference of observed training completion time and total training emulated compute time, thus showing the data reading +rate without taking into account emulation costs. Training average throughput and training throughput standard deviation +give an indication of the number of samples from the training dataset processed in one second. Training average IO and +Training IO standard deviation translate these values into bytes/second by multiplying by the size of one sample. + +## Future work + +There are plans to add more configuration options for the extension in the future to increase its flexibility: +- Add settings for Subfiling VFD. Currently, the default settings are used. +- Add more features from [DLIO Benchmark](https://github.com/argonne-lcf/dlio_benchmark) such as resizable records. +- Analyze and add support for other ml frameworks and data loaders. For example, DALI. +- Add support for prefetching. +- Expand the ability to randomly shuffle samples. At the moment, it is not possible to shuffle only samples in each file +without changing the order of the files for training. +- Add more compression filters and thus support different compression algorithms for HDF5 data. +- Add support for drop_last customization. Currently, by default, all batches left after MPI ranks distribution are not processed. +- Replace the use of `fork()` with `MPI_Comm_spawn()` when creating new processes, as using `fork()` with MPI may be unsafe +- Test support for the Cache VOL connector. +- Add support for checkpointing by saving the model to a hdf5 file. diff --git a/dlio/example_output.csv b/dlio/example_output.csv new file mode 100644 index 00000000..e56f28b3 --- /dev/null +++ b/dlio/example_output.csv @@ -0,0 +1,46 @@ +metric, value, unit +operation, dlio, +ranks, 16, +read threads, 0, +subfiling, NO, +chunking, NO, +collective meta, YES, +collective data, YES, +train total size, 63.000, GB +train size per rank, 3.938, GB +train emulated compute time per epoch, "2.907, 2.907, 2.907, 2.907, 2.907", "s, s, s, s, s" +train emulated compute time, 14.535, s +train metadata time per epoch, "0.559, 0.502, 0.434, 0.406, 0.482", "s, s, s, s, s" +train metadata time, 2.383, s +train raw read time per epoch, "4.205, 3.835, 3.767, 3.758, 3.830", "s, s, s, s, s" +train total raw read time, 19.395, s +train raw read rate per epoch, "958.749, 1.026, 1.045, 1.047, 1.027", "MB/s, GB/s, GB/s, GB/s, GB/s" +train avg raw read rate, 1.017, GB/s +train observed time per epoch, "8.709, 8.125, 7.789, 7.824, 8.775", "s, s, s, s, s" +train observed time, 41.223, s +train observed rate per epoch, "694.948, 772.636, 825.826, 820.059, 687.088", "MB/s, MB/s, MB/s, MB/s, MB/s" +train avg observed rate, 760.111, MB/s +train throughput samples per second per epoch, "7.234, 7.753, 8.088, 8.052, 7.179", "samples/s, samples/s, samples/s, samples/s, samples/s" +train throughput avg samples per second, 7.661, samples/s +train throughput stdev samples per second, 0.389, samples/s +train io avg, 490.330, MB/s +train io stdev, 24.925, MB/s +eval total size, 16.000, GB +eval size per rank, 1.000, GB +eval emulated compute time per epoch, "2.584, 2.584, 2.584, 2.584, 2.584", "s, s, s, s, s" +eval emulated compute time, 12.920, s +eval metadata time per epoch, "0.214, 0.151, 0.162, 0.141, 0.181", "s, s, s, s, s" +eval metadata time, 0.848, s +eval raw read time per epoch, "0.925, 0.913, 0.875, 0.853, 0.824", "s, s, s, s, s" +eval total raw read time, 4.390, s +eval raw read rate per epoch, "1.080, 1.095, 1.143, 1.171, 1.213", "GB/s, GB/s, GB/s, GB/s, GB/s" +eval avg raw read rate, 1.141, GB/s +eval observed time per epoch, "4.120, 3.904, 3.881, 3.867, 3.940", "s, s, s, s, s" +eval observed time, 19.712, s +eval observed rate per epoch, "666.574, 775.895, 789.646, 797.822, 755.279", "MB/s, MB/s, MB/s, MB/s, MB/s" +eval avg observed rate, 757.043, MB/s +eval throughput samples per second per epoch, "3.883, 4.099, 4.123, 4.137, 4.061", "samples/s, samples/s, samples/s, samples/s, samples/s" +eval throughput avg samples per second, 4.061, samples/s +eval throughput stdev samples per second, 0.092, samples/s +eval io avg, 259.877, MB/s +eval io stdev, 5.907, MB/s diff --git a/dlio/h5bench_dlio.c b/dlio/h5bench_dlio.c new file mode 100644 index 00000000..fddab7a0 --- /dev/null +++ b/dlio/h5bench_dlio.c @@ -0,0 +1,751 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "h5bench_dlio.h" +#include "stats.h" +#include "utils.h" +#include "workers.h" + +#ifdef HAVE_SUBFILING +#include "H5FDsubfiling.h" +#include "H5FDioc.h" +#endif + +// Maximum size of randomly generated data per file. If the file size is larger than the specified value, +// randomly generated data will be written to the file several times in a row. Default value is 2 GB +#define GENERATION_BUFFER_SIZE 2 * 1073741824lu + +// Global variables +int NUM_RANKS, MY_RANK; +uint32_t GENERATION_SIZE; +uint32_t DIM; +hid_t DCPL, FAPL, DAPL, DXPL; +MPI_Comm rest_training_steps_comm = MPI_COMM_WORLD; + +// Generating a dataset containing training data labels +void +generate_labels_dataset(hid_t file_id, hid_t filespace, hid_t memspace) +{ + hid_t dataset_id = H5Dcreate(file_id, config.LABELS_DATASET_NAME, H5T_STD_I64LE, filespace, H5P_DEFAULT, + H5P_DEFAULT, DAPL); + assert(dataset_id >= 0); + + uint64_t *data = (uint64_t *)malloc(config.NUM_SAMPLES_PER_FILE * sizeof(uint64_t)); + if (data == NULL) { + exit(1); + } + for (uint32_t i = 0; i < config.NUM_SAMPLES_PER_FILE; i++) { + data[i] = 0; + } + + hsize_t offset[1] = {0}; + hsize_t dims[1] = {config.NUM_SAMPLES_PER_FILE}; + H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, NULL, dims, NULL); + herr_t status = H5Dwrite(dataset_id, H5T_STD_I64LE, memspace, filespace, DXPL, data); + assert(status >= 0); + + free(data); + H5Dclose(dataset_id); +} + +// Generating a dataset containing random training data +void +generate_records_dataset(hid_t file_id, hid_t filespace, hid_t memspace, hid_t extra_memspace) +{ + hid_t dataset_id = + H5Dcreate(file_id, config.RECORDS_DATASET_NAME, H5T_STD_U8LE, filespace, H5P_DEFAULT, DCPL, DAPL); + assert(dataset_id >= 0); + + uint8_t *data = (uint8_t *)malloc(GENERATION_SIZE * sizeof(uint8_t)); + if (data == NULL) { + exit(1); + } + for (size_t i = 0; i < GENERATION_SIZE; i++) { + data[i] = rand() % 255; + } + + uint32_t num_iterations = (config.RECORD_LENGTH * config.NUM_SAMPLES_PER_FILE) / GENERATION_SIZE; + uint32_t extra_elements = (config.RECORD_LENGTH * config.NUM_SAMPLES_PER_FILE) % GENERATION_SIZE; + + hsize_t offset[3] = {0, 0, 0}; + hsize_t dims[3] = {config.NUM_SAMPLES_PER_FILE, DIM, DIM}; + + for (uint32_t i = 0; i < num_iterations; i++) { + offset[0] = i * config.RECORD_LENGTH * config.NUM_SAMPLES_PER_FILE; + H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, NULL, dims, NULL); + herr_t status = H5Dwrite(dataset_id, H5T_STD_U8LE, memspace, filespace, DXPL, data); + assert(status >= 0); + } + + if (extra_elements > 0) { + hsize_t extra_count[3] = {extra_elements, DIM, DIM}; + offset[0] = num_iterations * config.RECORD_LENGTH * config.NUM_SAMPLES_PER_FILE; + H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, NULL, extra_count, NULL); + herr_t status = H5Dwrite(dataset_id, H5T_STD_U8LE, extra_memspace, filespace, DXPL, data); + assert(status >= 0); + } + + free(data); + H5Dclose(dataset_id); +} + +// Generating a hdf5 file containing a dataset with random training data and a dataset with labels +void +generate_file(const char *file_name, hid_t labels_filespace, hid_t labels_memspace, hid_t records_filespace, + hid_t records_memspace, hid_t extra_records_memspace) +{ + hid_t file_id = H5Fcreate(file_name, H5F_ACC_TRUNC, H5P_DEFAULT, FAPL); + assert(file_id >= 0); + + generate_records_dataset(file_id, records_filespace, records_memspace, extra_records_memspace); + generate_labels_dataset(file_id, labels_filespace, labels_memspace); + + H5Fclose(file_id); +} + +// Distribution of file generation work among MPI ranks +void +generate_data() +{ + if (MY_RANK == 0) { + printf("Starting data generation\n"); + printf("Number of files for training dataset: %u\n", config.NUM_FILES_TRAIN); + printf("Number of files for evaluation dataset: %u\n", config.NUM_FILES_EVAL); + } + + hsize_t labels_dims[1] = {config.NUM_SAMPLES_PER_FILE}; + hid_t labels_filespace = H5Screate_simple(1, labels_dims, NULL); + assert(labels_filespace >= 0); + hid_t labels_memspace = H5Screate_simple(1, labels_dims, NULL); + assert(labels_memspace >= 0); + + hsize_t records_dims[3] = {config.NUM_SAMPLES_PER_FILE, DIM, DIM}; + hid_t records_filespace = H5Screate_simple(3, records_dims, NULL); + assert(records_filespace >= 0); + hid_t records_memspace = H5Screate_simple(3, records_dims, NULL); + assert(records_memspace >= 0); + + hsize_t extra_records_count[3] = {(config.RECORD_LENGTH * config.NUM_SAMPLES_PER_FILE) % GENERATION_SIZE, + DIM, DIM}; + hid_t extra_records_memspace = H5Screate_simple(3, extra_records_count, NULL); + assert(extra_records_memspace >= 0); + + uint32_t from = config.SUBFILING ? 0 : MY_RANK; + uint32_t increment = config.SUBFILING ? 1 : NUM_RANKS; + + for (uint32_t i = from; i < config.NUM_FILES_TRAIN; i += increment) { + srand(config.RANDOM_SEED + i); + + // if (!config.SUBFILING || config.SUBFILING && (MY_RANK == 0)) + // printf("Generate train file %u / %u\n", i + 1, config.NUM_FILES_TRAIN); + char file_name[256]; + snprintf(file_name, sizeof(file_name), "%s/%s/%s_%u_of_%u.h5", config.DATA_FOLDER, + config.TRAIN_DATA_FOLDER, config.FILE_PREFIX, i + 1, config.NUM_FILES_TRAIN); + generate_file(file_name, labels_filespace, labels_memspace, records_filespace, records_memspace, + extra_records_memspace); + } + + for (uint32_t i = from; i < config.NUM_FILES_EVAL; i += increment) { + srand(config.RANDOM_SEED + config.NUM_FILES_TRAIN + i); + + // if (!config.SUBFILING || config.SUBFILING && (MY_RANK == 0)) + // printf("Generate valid file %u / %u\n", i + 1, config.NUM_FILES_EVAL); + char file_name[256]; + snprintf(file_name, sizeof(file_name), "%s/%s/%s_%u_of_%u.h5", config.DATA_FOLDER, + config.VALID_DATA_FOLDER, config.FILE_PREFIX, i + 1, config.NUM_FILES_EVAL); + generate_file(file_name, labels_filespace, labels_memspace, records_filespace, records_memspace, + extra_records_memspace); + } + + H5Sclose(labels_memspace); + H5Sclose(labels_filespace); + H5Sclose(records_memspace); + H5Sclose(extra_records_memspace); + H5Sclose(records_filespace); + + if (MY_RANK == 0) { + printf("Generation done\n"); + } +} + +// Read a given sample from a given hdf5 file +void +read_sample(const char *file_path, uint32_t sample, uint64_t *metadata_time_out, uint64_t *read_time_out) +{ + hsize_t offset[3] = {sample, 0, 0}; + hsize_t count[3] = {1, DIM, DIM}; + + uint64_t t1 = get_time_usec_return_uint64(); + hid_t file_id = H5Fopen(file_path, H5F_ACC_RDONLY, FAPL); + hid_t dataset_id = H5Dopen(file_id, config.RECORDS_DATASET_NAME, DAPL); + hid_t filespace = H5Dget_space(dataset_id); + hid_t memspace = H5Screate_simple(3, count, NULL); + H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, NULL, count, NULL); + uint64_t t2 = get_time_usec_return_uint64(); + assert(file_id >= 0); + assert(dataset_id >= 0); + assert(filespace >= 0); + assert(memspace >= 0); + + uint8_t *data = (uint8_t *)malloc(DIM * DIM * sizeof(uint8_t)); + if (data == NULL) { + exit(1); + } + + uint64_t t3 = get_time_usec_return_uint64(); + herr_t status = H5Dread(dataset_id, H5T_STD_U8LE, memspace, filespace, DXPL, data); + uint64_t t4 = get_time_usec_return_uint64(); + assert(status >= 0); + + free(data); // TODO: free memory only after compute() call? + + uint64_t t5 = get_time_usec_return_uint64(); + H5Sclose(memspace); + H5Sclose(filespace); + H5Dclose(dataset_id); + H5Fclose(file_id); + uint64_t t6 = get_time_usec_return_uint64(); + + *metadata_time_out = (t2 - t1) + (t6 - t5); + *read_time_out = t4 - t3; + + compute(config.PREPROCESS_TIME, config.PREPROCESS_TIME_STDEV); +} + +// Simulation of computations by means of sleep() function +uint64_t +compute(float time, float time_stdev) +{ + if (time != 0.0 || time_stdev != 0.0) { + int t = (uint64_t)(generate_normal_random(time, time_stdev) * 1000000.0); + usleep(t > 0 ? t : 0); + return t; + } + return 0; +} + +// Evaluation process simulation without the use of multiprocessing and workers +void +eval_without_workers(uint32_t epoch, uint32_t *indices, uint64_t *local_metadata_time_out, + uint64_t *local_read_time_out) +{ + uint32_t offset = MY_RANK * config.NUM_EVAL_BATCHES_PER_RANK; + + uint64_t t0 = get_time_usec_return_uint64(); + for (uint32_t i = 0; i < config.NUM_EVAL_BATCHES_PER_RANK; i++) { + for (uint32_t j = 0; j < config.BATCH_SIZE_EVAL; j++) { + uint32_t file_num = + indices[offset + i * config.BATCH_SIZE_EVAL + j] / config.NUM_SAMPLES_PER_FILE + 1; + uint32_t sample_num = + indices[offset + i * config.BATCH_SIZE_EVAL + j] % config.NUM_SAMPLES_PER_FILE; + char file_path[256]; + snprintf(file_path, sizeof(file_path), "%s/%s/%s_%u_of_%u.h5", config.DATA_FOLDER, + config.VALID_DATA_FOLDER, config.FILE_PREFIX, file_num, config.NUM_FILES_EVAL); + + uint64_t metadata_time = 0, read_time = 0; + read_sample(file_path, sample_num, &metadata_time, &read_time); + + *local_metadata_time_out += metadata_time; + *local_read_time_out += read_time; + } + + batch_loaded_eval(epoch, t0); + + uint64_t t = compute(config.EVAL_TIME, config.EVAL_TIME_STDEV); + batch_processed_eval(epoch, t, t0); + MPI_Barrier(MPI_COMM_WORLD); + + t0 = get_time_usec_return_uint64(); + } +} + +// Evaluation process simulation using multiprocessing and workers +void +eval_using_workers(uint32_t epoch, uint64_t *local_metadata_time_out, uint64_t *local_read_time_out) +{ + force_workers_to_shuffle(get_eval_read_fd(), get_eval_write_fd(), get_eval_system_fd()); + + uint32_t offset = MY_RANK * config.NUM_EVAL_BATCHES_PER_RANK; + + for (uint32_t i = 0; + i < (config.READ_THREADS > config.NUM_EVAL_BATCHES_PER_RANK ? config.NUM_EVAL_BATCHES_PER_RANK + : config.READ_THREADS); + i++) { + int32_t batch = offset + i; + write(get_eval_write_fd(), &batch, sizeof(batch)); + } + + for (uint32_t i = config.READ_THREADS; i < config.NUM_EVAL_BATCHES_PER_RANK; i++) { + execution_time_t data_from_child_process; + uint64_t t0 = get_time_usec_return_uint64(); + read(get_eval_read_fd(), &data_from_child_process, sizeof(data_from_child_process)); + + batch_loaded_eval(epoch, t0); + + *local_metadata_time_out += data_from_child_process.metadata_time; + *local_read_time_out += data_from_child_process.read_time; + + int32_t batch = offset + i; + write(get_eval_write_fd(), &batch, sizeof(batch)); + + uint64_t t = compute(config.EVAL_TIME, config.EVAL_TIME_STDEV); + batch_processed_eval(epoch, t, t0); + MPI_Barrier(MPI_COMM_WORLD); + } + + for (uint32_t i = 0; + i < (config.READ_THREADS > config.NUM_EVAL_BATCHES_PER_RANK ? config.NUM_EVAL_BATCHES_PER_RANK + : config.READ_THREADS); + i++) { + execution_time_t data_from_child_process; + uint64_t t0 = get_time_usec_return_uint64(); + read(get_eval_read_fd(), &data_from_child_process, sizeof(data_from_child_process)); + + batch_loaded_eval(epoch, t0); + + *local_metadata_time_out += data_from_child_process.metadata_time; + *local_read_time_out += data_from_child_process.read_time; + + uint64_t t = compute(config.EVAL_TIME, config.EVAL_TIME_STDEV); + batch_processed_eval(epoch, t, t0); + MPI_Barrier(MPI_COMM_WORLD); + } +} + +// Preparing and selecting a way to simulate the evaluation process +void +eval(uint32_t epoch, uint32_t *indices, bool enable_multiprocessing) +{ + uint64_t eval_metadata_time = 0, eval_read_time = 0; + if (enable_multiprocessing) { + start_eval(epoch); + eval_using_workers(epoch, &eval_metadata_time, &eval_read_time); + end_eval(epoch, eval_metadata_time, eval_read_time); + return; + } + + if (config.SEED_CHANGE_EPOCH) + srand(config.RANDOM_SEED * 2 + epoch); + if (config.DO_SHUFFLE) + shuffle(indices, config.NUM_FILES_EVAL * config.NUM_SAMPLES_PER_FILE); + + start_eval(epoch); + eval_without_workers(epoch, indices, &eval_metadata_time, &eval_read_time); + end_eval(epoch, eval_metadata_time, eval_read_time); +} + +// Training process simulation without the use of multiprocessing and workers +void +train_without_workers(uint32_t epoch, uint32_t *indices, uint64_t *local_metadata_time_out, + uint64_t *local_read_time_out) +{ + uint32_t offset = MY_RANK * config.NUM_TRAIN_BATCHES_PER_RANK; + + uint64_t t0 = get_time_usec_return_uint64(); + for (uint32_t i = 0; i < config.NUM_TRAIN_BATCHES_PER_RANK; i++) { + if (i == config.TOTAL_TRAINING_STEPS_PER_RANK) { + break; + } + for (uint32_t j = 0; j < config.BATCH_SIZE; j++) { + uint32_t file_num = indices[offset + i * config.BATCH_SIZE + j] / config.NUM_SAMPLES_PER_FILE + 1; + uint32_t sample_num = indices[offset + i * config.BATCH_SIZE + j] % config.NUM_SAMPLES_PER_FILE; + char file_path[256]; + snprintf(file_path, sizeof(file_path), "%s/%s/%s_%u_of_%u.h5", config.DATA_FOLDER, + config.TRAIN_DATA_FOLDER, config.FILE_PREFIX, file_num, config.NUM_FILES_TRAIN); + + uint64_t metadata_time = 0, read_time = 0; + read_sample(file_path, sample_num, &metadata_time, &read_time); + + *local_metadata_time_out += metadata_time; + *local_read_time_out += read_time; + } + + batch_loaded_train(epoch, t0); + + uint64_t t = compute(config.COMPUTATION_TIME, config.COMPUTATION_TIME_STDEV); + batch_processed_train(epoch, t, t0); + if ((MY_RANK < config.TOTAL_TRAINING_STEPS % NUM_RANKS) && + (i + 1 == config.TOTAL_TRAINING_STEPS_PER_RANK)) { + MPI_Barrier(rest_training_steps_comm); + } + else { + MPI_Barrier(MPI_COMM_WORLD); + } + + t0 = get_time_usec_return_uint64(); + } +} + +// Training process simulation using multiprocessing and workers +void +train_using_workers(uint32_t epoch, uint64_t *local_metadata_time_out, uint64_t *local_read_time_out) +{ + force_workers_to_shuffle(get_train_read_fd(), get_train_write_fd(), get_train_system_fd()); + uint32_t offset = MY_RANK * config.NUM_TRAIN_BATCHES_PER_RANK; + + for (uint32_t i = 0; i < config.NUM_OF_ACTUALLY_USED_PROCESSES_TRAIN; i++) { + int32_t batch = offset + i; + write(get_train_write_fd(), &batch, sizeof(batch)); + } + + for (uint32_t i = config.NUM_OF_ACTUALLY_USED_PROCESSES_TRAIN; i < config.NUM_TRAIN_BATCHES_PER_RANK; + i++) { + if (i == config.TOTAL_TRAINING_STEPS_PER_RANK) { + break; + } + execution_time_t data_from_child_process; + uint64_t t0 = get_time_usec_return_uint64(); + read(get_train_read_fd(), &data_from_child_process, sizeof(data_from_child_process)); + + batch_loaded_train(epoch, t0); + + *local_metadata_time_out += data_from_child_process.metadata_time; + *local_read_time_out += data_from_child_process.read_time; + + int32_t batch = offset + i; + write(get_train_write_fd(), &batch, sizeof(batch)); + + uint64_t t = compute(config.COMPUTATION_TIME, config.COMPUTATION_TIME_STDEV); + batch_processed_train(epoch, t, t0); + + if ((MY_RANK < config.TOTAL_TRAINING_STEPS % NUM_RANKS) && + (i + 1 == config.TOTAL_TRAINING_STEPS_PER_RANK)) { + MPI_Barrier(rest_training_steps_comm); + } + else { + MPI_Barrier(MPI_COMM_WORLD); + } + } + + for (uint32_t i = 0; i < config.NUM_OF_ACTUALLY_USED_PROCESSES_TRAIN; i++) { + execution_time_t data_from_child_process; + uint64_t t0 = get_time_usec_return_uint64(); + read(get_train_read_fd(), &data_from_child_process, sizeof(data_from_child_process)); + + batch_loaded_train(epoch, t0); + + *local_metadata_time_out += data_from_child_process.metadata_time; + *local_read_time_out += data_from_child_process.read_time; + + uint64_t t = compute(config.COMPUTATION_TIME, config.COMPUTATION_TIME_STDEV); + batch_processed_train(epoch, t, t0); + if ((MY_RANK < config.TOTAL_TRAINING_STEPS % NUM_RANKS) && + (i + 1 == config.TOTAL_TRAINING_STEPS_PER_RANK)) { + MPI_Barrier(rest_training_steps_comm); + } + else { + MPI_Barrier(MPI_COMM_WORLD); + } + } +} + +// Preparing and selecting a way to simulate the training process +void +train(uint32_t epoch, uint32_t *indices, bool enable_multiprocessing) +{ + uint64_t train_metadata_time = 0, train_read_time = 0; + if (enable_multiprocessing) { + start_train(epoch); + train_using_workers(epoch, &train_metadata_time, &train_read_time); + end_train(epoch, train_metadata_time, train_read_time); + return; + } + + if (config.SEED_CHANGE_EPOCH) + srand(config.RANDOM_SEED + epoch); + if (config.DO_SHUFFLE) + shuffle(indices, config.NUM_FILES_TRAIN * config.NUM_SAMPLES_PER_FILE); + + start_train(epoch); + train_without_workers(epoch, indices, &train_metadata_time, &train_read_time); + end_train(epoch, train_metadata_time, train_read_time); +} + +// Starting the benchmark and simulation process of training and evaluation +void +run() +{ + uint32_t total_train_samples = config.NUM_FILES_TRAIN * config.NUM_SAMPLES_PER_FILE; + uint32_t *indices_train = (uint32_t *)malloc(total_train_samples * sizeof(uint32_t)); + if (indices_train == NULL) { + exit(1); + } + for (uint32_t i = 0; i < total_train_samples; i++) { + indices_train[i] = i; + } + + uint32_t total_eval_samples = config.NUM_FILES_EVAL * config.NUM_SAMPLES_PER_FILE; + uint32_t *indices_eval = (uint32_t *)malloc(total_eval_samples * sizeof(uint32_t)); + if (indices_eval == NULL) { + exit(1); + } + for (unsigned long i = 0; i < total_eval_samples; i++) { + indices_eval[i] = i; + } + + uint32_t next_eval_epoch = config.EPOCHS_BETWEEN_EVALS; + bool enable_multiprocessing = config.READ_THREADS > 0; + if (enable_multiprocessing) { + init_workers(indices_train, indices_eval); + } + + MPI_Barrier(MPI_COMM_WORLD); + + for (uint32_t epoch = 0; epoch < config.EPOCHS; epoch++) { + if (MY_RANK == 0) + printf("Starting epoch %u...\n", epoch + 1); + + train(epoch, indices_train, enable_multiprocessing); + MPI_Barrier(MPI_COMM_WORLD); + + if (config.DO_EVALUATION && (epoch + 1 >= next_eval_epoch)) { + if (MY_RANK == 0) + printf("Starting evaluation...\n"); + eval(epoch, indices_eval, enable_multiprocessing); + next_eval_epoch += config.EPOCHS_BETWEEN_EVALS; + MPI_Barrier(MPI_COMM_WORLD); + } + } + if (enable_multiprocessing) { + fin_workers(); + } + + free(indices_train); + free(indices_eval); + + MPI_Barrier(MPI_COMM_WORLD); +} + +// Initialization of some global variables and settings for benchmark operation +void +init_global_variables() +{ + if (MY_RANK == 0) { + if (config.EPOCHS == 0) { + printf("The value of parameter \"epochs\" must be greater than 0\n"); + } + if (config.NUM_SAMPLES_PER_FILE == 0) { + printf("The value of parameter \"num-samples-per-file\" must be greater than 0\n"); + } + if (config.BATCH_SIZE == 0) { + printf("The value of parameter \"batch-size\" must be greater than 0\n"); + } + if (config.BATCH_SIZE_EVAL == 0) { + printf("The value of parameter \"batch-size-eval\" must be greater than 0\n"); + } + } + + DIM = (uint32_t)sqrt(config.RECORD_LENGTH); + config.RECORD_LENGTH = DIM * DIM; + + uint32_t chunk_dimension = (uint32_t)sqrt(config.CHUNK_SIZE); + chunk_dimension = chunk_dimension > DIM ? DIM : chunk_dimension; + config.CHUNK_SIZE = chunk_dimension * chunk_dimension; + + uint32_t data_length = config.RECORD_LENGTH * config.NUM_SAMPLES_PER_FILE; + GENERATION_SIZE = data_length > GENERATION_BUFFER_SIZE ? GENERATION_BUFFER_SIZE : data_length; + + config.NUM_TRAIN_BATCHES_PER_RANK = + config.NUM_FILES_TRAIN * config.NUM_SAMPLES_PER_FILE / config.BATCH_SIZE / NUM_RANKS; + config.NUM_EVAL_BATCHES_PER_RANK = + config.NUM_FILES_EVAL * config.NUM_SAMPLES_PER_FILE / config.BATCH_SIZE_EVAL / NUM_RANKS; + + config.NUM_OF_ACTUALLY_USED_PROCESSES_TRAIN = config.READ_THREADS > config.NUM_TRAIN_BATCHES_PER_RANK + ? config.NUM_TRAIN_BATCHES_PER_RANK + : config.READ_THREADS; + config.NUM_OF_ACTUALLY_USED_PROCESSES_EVAL = config.READ_THREADS > config.NUM_EVAL_BATCHES_PER_RANK + ? config.NUM_EVAL_BATCHES_PER_RANK + : config.READ_THREADS; + + if (config.TOTAL_TRAINING_STEPS != -1 && + config.TOTAL_TRAINING_STEPS < config.NUM_TRAIN_BATCHES_PER_RANK * NUM_RANKS) { + config.TOTAL_TRAINING_STEPS_PER_RANK = config.TOTAL_TRAINING_STEPS / NUM_RANKS; + if (MY_RANK < config.TOTAL_TRAINING_STEPS % NUM_RANKS) { + config.TOTAL_TRAINING_STEPS_PER_RANK++; + MPI_Comm_split(MPI_COMM_WORLD, 0, MY_RANK, &rest_training_steps_comm); + } + else { + MPI_Comm_split(MPI_COMM_WORLD, MPI_UNDEFINED, MY_RANK, &rest_training_steps_comm); + } + + config.NUM_OF_ACTUALLY_USED_PROCESSES_TRAIN = + config.NUM_OF_ACTUALLY_USED_PROCESSES_TRAIN > config.TOTAL_TRAINING_STEPS_PER_RANK + ? config.TOTAL_TRAINING_STEPS_PER_RANK + : config.NUM_OF_ACTUALLY_USED_PROCESSES_TRAIN; + } + else { + config.TOTAL_TRAINING_STEPS = -1; + } + + srand(config.RANDOM_SEED); + + // drop last warning + +#ifndef HAVE_SUBFILING + config.SUBFILING = false; +#endif + + FAPL = H5Pcreate(H5P_FILE_ACCESS); + DCPL = H5Pcreate(H5P_DATASET_CREATE); + DAPL = H5Pcreate(H5P_DATASET_ACCESS); + DXPL = H5Pcreate(H5P_DATASET_XFER); + + if (config.SUBFILING) { + if (MY_RANK == 0) + printf("Using Subfiling VFD\n"); +#ifdef HAVE_SUBFILING + H5Pset_fapl_subfiling(FAPL, NULL); +#endif + if (config.COLLECTIVE_DATA) { + if (MY_RANK == 0) + printf("Warning: Collective mode can't be used with subfiling\n"); + config.COLLECTIVE_DATA = false; + } + if (config.DO_CHUNKING) { + if (MY_RANK == 0) + printf("Warning: Chunking can't be used with subfiling\n"); + config.DO_CHUNKING = false; + } + if (config.READ_THREADS > 0) { + if (MY_RANK == 0) + printf( + "Warning: Multiprocessing can't be used with subfiling. READ_THREADS is set to 0...\n"); + config.READ_THREADS = 0; + } + } + else if (config.DO_CHUNKING) { + if (MY_RANK == 0) + printf("Using chunking with the chunk shape (1, %u, %u)", chunk_dimension, chunk_dimension); + hsize_t chunk_dims[3] = {1, chunk_dimension, chunk_dimension}; + H5Pset_chunk(DCPL, 3, chunk_dims); + if (config.DO_COMPRESSION) { + if (MY_RANK == 0) + printf(" and compression (level %u)", config.COMPRESSION_LEVEL); + H5Pset_deflate(DCPL, config.COMPRESSION_LEVEL); + } + if (MY_RANK == 0) + printf("\n"); + if (config.COLLECTIVE_DATA) { + if (MY_RANK == 0) + printf("Warning: Collective mode can't be used with subfiling\n"); + config.COLLECTIVE_DATA = false; + } + } + else { + H5Pset_fapl_mpio(FAPL, MPI_COMM_SELF, MPI_INFO_NULL); + if (config.COLLECTIVE_DATA) { + if (MY_RANK == 0) + printf("Using collective I/O mode\n"); + H5Pset_dxpl_mpio(DXPL, H5FD_MPIO_COLLECTIVE); + } + else { + if (MY_RANK == 0) + printf("Using independent I/O mode\n"); + H5Pset_dxpl_mpio(DXPL, H5FD_MPIO_INDEPENDENT); + } + } + +#if H5_VERSION_GE(1, 10, 0) + if (config.COLLECTIVE_META) { + if (MY_RANK == 0) + printf("Using collective meta-data I/O mode\n"); + H5Pset_all_coll_metadata_ops(FAPL, true); + H5Pset_coll_metadata_write(FAPL, true); + H5Pset_all_coll_metadata_ops(DAPL, true); + } +#endif + + if ((MY_RANK == 0) && config.DO_TRAIN) { + printf("The number of training batches per rank: %u\n", config.NUM_TRAIN_BATCHES_PER_RANK); + if (config.READ_THREADS > config.NUM_TRAIN_BATCHES_PER_RANK) { + printf("Warning: The number of requested read threads (%u) is greater than the number of " + "training batches per rank (%u)!\n", + config.READ_THREADS, config.NUM_TRAIN_BATCHES_PER_RANK); + } + printf("The number of evaluation batches per rank: %u\n", config.NUM_EVAL_BATCHES_PER_RANK); + if (config.READ_THREADS > config.NUM_EVAL_BATCHES_PER_RANK) { + printf("Warning: The number of requested read threads (%u) is greater than the number of " + "evaluation batches per rank (%u)!\n", + config.READ_THREADS, config.NUM_EVAL_BATCHES_PER_RANK); + } + } +} + +int +main(int argc, char *argv[]) +{ + int mpi_thread_lvl_provided = -1; + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &mpi_thread_lvl_provided); + assert(MPI_THREAD_MULTIPLE == mpi_thread_lvl_provided); + MPI_Comm_rank(MPI_COMM_WORLD, &MY_RANK); + MPI_Comm_size(MPI_COMM_WORLD, &NUM_RANKS); + + parse_args(argc, argv); + + if (MY_RANK == 0) { + printf("Create directory \"%s\"... ", config.DATA_FOLDER); + create_directory(config.DATA_FOLDER); + printf("OK\n"); + + printf("Create directory \"%s/%s\"... ", config.DATA_FOLDER, config.TRAIN_DATA_FOLDER); + char dir_name[256]; + snprintf(dir_name, sizeof(dir_name), "%s/%s", config.DATA_FOLDER, config.TRAIN_DATA_FOLDER); + create_directory(dir_name); + printf("OK\n"); + + printf("Create directory \"%s/%s\"... ", config.DATA_FOLDER, config.VALID_DATA_FOLDER); + snprintf(dir_name, sizeof(dir_name), "%s/%s", config.DATA_FOLDER, config.VALID_DATA_FOLDER); + create_directory(dir_name); + printf("OK\n"); + + printf("Create directory \"%s\"... ", config.OUTPUT_DATA_FOLDER); + snprintf(dir_name, sizeof(dir_name), "%s", config.OUTPUT_DATA_FOLDER); + create_directory(dir_name); + printf("OK\n"); + } + init_global_variables(); + MPI_Barrier(MPI_COMM_WORLD); + + if (config.DO_DATA_GENERATION) { + generate_data(); + } + + if (config.DO_TRAIN) { + // TODO: check files dimension if generate=no + stats_initialize(); + + run(); + prepare_data(); + MPI_Barrier(MPI_COMM_WORLD); + + if (config.OUTPUT_RANKS_DATA) { + print_rank_data(); + } + + if (MY_RANK == 0) { + print_average_data(); + } + + stats_finalize(); + } + + if (!config.KEEP_FILES && MY_RANK == 0) { + delete_directory(config.DATA_FOLDER); + } + + H5Pclose(DCPL); + H5Pclose(DXPL); + H5Pclose(DAPL); + H5Pclose(FAPL); + H5close(); + MPI_Finalize(); + return 0; +} diff --git a/dlio/h5bench_dlio.h b/dlio/h5bench_dlio.h new file mode 100644 index 00000000..61ca8ce8 --- /dev/null +++ b/dlio/h5bench_dlio.h @@ -0,0 +1,40 @@ +#ifndef SANDBOX_H5BENCH_ML_READ_H +#define SANDBOX_H5BENCH_ML_READ_H + +#include + +extern int NUM_RANKS, MY_RANK; + +void generate_labels_dataset(hid_t file_id, hid_t filespace, hid_t memspace); + +void generate_records_dataset(hid_t file_id, hid_t filespace, hid_t memspace, hid_t extra_memspace); + +void generate_file(const char *file_name, hid_t labels_filespace, hid_t labels_memspace, + hid_t records_filespace, hid_t records_memspace, hid_t extra_records_memspace); + +void generate_data(); + +void read_sample(const char *file_path, uint32_t sample, uint64_t *metadata_time_out, + uint64_t *read_time_out); + +uint64_t compute(float time, float time_stdev); + +void eval_without_workers(uint32_t epoch, uint32_t *indices, uint64_t *local_metadata_time_out, + uint64_t *local_read_time_out); + +void eval_using_workers(uint32_t epoch, uint64_t *local_metadata_time_out, uint64_t *local_read_time_out); + +void eval(uint32_t epoch, uint32_t *indices, bool enable_multiprocessing); + +void train_without_workers(uint32_t epoch, uint32_t *indices, uint64_t *local_metadata_time_out, + uint64_t *local_read_time_out); + +void train_using_workers(uint32_t epoch, uint64_t *local_metadata_time_out, uint64_t *local_read_time_out); + +void train(uint32_t epoch, uint32_t *indices, bool enable_multiprocessing); + +void run(); + +void init_global_variables(); + +#endif // SANDBOX_H5BENCH_ML_READ_H diff --git a/dlio/stats.c b/dlio/stats.c new file mode 100644 index 00000000..ae9e63e6 --- /dev/null +++ b/dlio/stats.c @@ -0,0 +1,1192 @@ +#include +#include +#include +#include +#include + +#include "../commons/h5bench_util.h" +#include "h5bench_dlio.h" +#include "stats.h" +#include "utils.h" + +epoch_data_t *stats; +epoch_data_t *global_stats; + +uint32_t *last_load_train; +uint32_t *last_load_eval; +uint32_t *last_proc_train; +uint32_t *last_proc_eval; +uint32_t *last_compute_train; +uint32_t *last_compute_eval; + +// Initialization of variables for storing statistics information +void +stats_initialize() +{ + stats = (struct epoch_data *)malloc(config.EPOCHS * sizeof(struct epoch_data)); + if (stats == NULL) { + exit(1); + } + + for (uint32_t i = 0; i < config.EPOCHS; i++) { + stats[i].load.train = (uint64_t *)calloc(config.NUM_TRAIN_BATCHES_PER_RANK, sizeof(uint64_t)); + if (stats[i].load.train == NULL) { + exit(1); + } + stats[i].load.eval = (uint64_t *)calloc(config.NUM_EVAL_BATCHES_PER_RANK, sizeof(uint64_t)); + if (stats[i].load.eval == NULL) { + exit(1); + } + stats[i].proc.train = (uint64_t *)calloc(config.NUM_TRAIN_BATCHES_PER_RANK, sizeof(uint64_t)); + if (stats[i].proc.train == NULL) { + exit(1); + } + stats[i].proc.eval = (uint64_t *)calloc(config.NUM_EVAL_BATCHES_PER_RANK, sizeof(uint64_t)); + if (stats[i].proc.eval == NULL) { + exit(1); + } + stats[i].throughput.train = 0.0; + stats[i].throughput.eval = 0.0; + stats[i].compute.train = (uint64_t *)calloc(config.NUM_TRAIN_BATCHES_PER_RANK, sizeof(uint64_t)); + if (stats[i].compute.train == NULL) { + exit(1); + } + stats[i].compute.eval = (uint64_t *)calloc(config.NUM_EVAL_BATCHES_PER_RANK, sizeof(uint64_t)); + if (stats[i].compute.eval == NULL) { + exit(1); + } + stats[i].observed_time.train = 0; + stats[i].observed_time.eval = 0; + stats[i].metadata_time.train = 0; + stats[i].metadata_time.eval = 0; + stats[i].raw_read_time.train = 0; + stats[i].raw_read_time.eval = 0; + } + + last_load_train = calloc(config.EPOCHS, sizeof(uint32_t)); + if (last_load_train == NULL) { + exit(1); + } + last_load_eval = calloc(config.EPOCHS, sizeof(uint32_t)); + if (last_load_eval == NULL) { + exit(1); + } + last_proc_train = calloc(config.EPOCHS, sizeof(uint32_t)); + if (last_proc_train == NULL) { + exit(1); + } + last_proc_eval = calloc(config.EPOCHS, sizeof(uint32_t)); + if (last_proc_eval == NULL) { + exit(1); + } + last_compute_train = calloc(config.EPOCHS, sizeof(uint32_t)); + if (last_compute_train == NULL) { + exit(1); + } + last_compute_eval = calloc(config.EPOCHS, sizeof(uint32_t)); + if (last_compute_eval == NULL) { + exit(1); + } +} + +// Release of resources initialized for storing statistics information +void +stats_finalize() +{ + free(last_load_train); + free(last_load_eval); + free(last_proc_train); + free(last_proc_eval); + free(last_compute_train); + free(last_compute_eval); + + for (uint32_t i = 0; i < config.EPOCHS; i++) { + free(stats[i].load.train); + free(stats[i].load.eval); + free(stats[i].proc.train); + free(stats[i].proc.eval); + free(stats[i].compute.train); + free(stats[i].compute.eval); + + free(global_stats[i].load.train); + free(global_stats[i].load.eval); + free(global_stats[i].proc.train); + free(global_stats[i].proc.eval); + free(global_stats[i].compute.train); + free(global_stats[i].compute.eval); + } + + free(stats); + free(global_stats); +} + +// +void +prepare_data() +{ + global_stats = (struct epoch_data *)malloc(config.EPOCHS * sizeof(struct epoch_data)); + if (global_stats == NULL) { + exit(1); + } + + for (uint32_t i = 0; i < config.EPOCHS; i++) { + global_stats[i].load.train = (uint64_t *)calloc(config.NUM_TRAIN_BATCHES_PER_RANK, sizeof(uint64_t)); + if (global_stats[i].load.train == NULL) { + exit(1); + } + global_stats[i].load.eval = (uint64_t *)calloc(config.NUM_EVAL_BATCHES_PER_RANK, sizeof(uint64_t)); + if (global_stats[i].load.eval == NULL) { + exit(1); + } + global_stats[i].proc.train = (uint64_t *)calloc(config.NUM_TRAIN_BATCHES_PER_RANK, sizeof(uint64_t)); + if (global_stats[i].proc.train == NULL) { + exit(1); + } + global_stats[i].proc.eval = (uint64_t *)calloc(config.NUM_EVAL_BATCHES_PER_RANK, sizeof(uint64_t)); + if (global_stats[i].proc.eval == NULL) { + exit(1); + } + global_stats[i].compute.train = + (uint64_t *)calloc(config.NUM_TRAIN_BATCHES_PER_RANK, sizeof(uint64_t)); + if (global_stats[i].compute.train == NULL) { + exit(1); + } + global_stats[i].compute.eval = (uint64_t *)calloc(config.NUM_EVAL_BATCHES_PER_RANK, sizeof(uint64_t)); + if (global_stats[i].compute.eval == NULL) { + exit(1); + } + + MPI_Reduce(stats[i].load.train, global_stats[i].load.train, config.NUM_TRAIN_BATCHES_PER_RANK, + MPI_UNSIGNED_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD); + MPI_Reduce(stats[i].load.eval, global_stats[i].load.eval, config.NUM_EVAL_BATCHES_PER_RANK, + MPI_UNSIGNED_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD); + MPI_Reduce(stats[i].proc.train, global_stats[i].proc.train, config.NUM_TRAIN_BATCHES_PER_RANK, + MPI_UNSIGNED_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD); + MPI_Reduce(stats[i].proc.eval, global_stats[i].proc.eval, config.NUM_EVAL_BATCHES_PER_RANK, + MPI_UNSIGNED_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD); + MPI_Reduce(&stats[i].throughput.train, &global_stats[i].throughput.train, 1, MPI_DOUBLE, MPI_SUM, 0, + MPI_COMM_WORLD); + MPI_Reduce(&stats[i].throughput.eval, &global_stats[i].throughput.eval, 1, MPI_DOUBLE, MPI_SUM, 0, + MPI_COMM_WORLD); + MPI_Reduce(stats[i].compute.train, global_stats[i].compute.train, config.NUM_TRAIN_BATCHES_PER_RANK, + MPI_UNSIGNED_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD); + MPI_Reduce(stats[i].compute.eval, global_stats[i].compute.eval, config.NUM_EVAL_BATCHES_PER_RANK, + MPI_UNSIGNED_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD); + MPI_Reduce(&stats[i].observed_time.train, &global_stats[i].observed_time.train, 1, + MPI_UNSIGNED_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD); + MPI_Reduce(&stats[i].observed_time.eval, &global_stats[i].observed_time.eval, 1, + MPI_UNSIGNED_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD); + MPI_Reduce(&stats[i].metadata_time.train, &global_stats[i].metadata_time.train, 1, + MPI_UNSIGNED_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD); + MPI_Reduce(&stats[i].metadata_time.eval, &global_stats[i].metadata_time.eval, 1, + MPI_UNSIGNED_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD); + MPI_Reduce(&stats[i].raw_read_time.train, &global_stats[i].raw_read_time.train, 1, + MPI_UNSIGNED_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD); + MPI_Reduce(&stats[i].raw_read_time.eval, &global_stats[i].raw_read_time.eval, 1, + MPI_UNSIGNED_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD); + + for (int j = 0; j < config.NUM_TRAIN_BATCHES_PER_RANK; j++) { + global_stats[i].load.train[j] /= NUM_RANKS; + global_stats[i].proc.train[j] /= NUM_RANKS; + global_stats[i].compute.train[j] /= NUM_RANKS; + } + + for (int j = 0; j < config.NUM_EVAL_BATCHES_PER_RANK; j++) { + global_stats[i].load.eval[j] /= NUM_RANKS; + global_stats[i].proc.eval[j] /= NUM_RANKS; + global_stats[i].compute.eval[j] /= NUM_RANKS; + } + + global_stats[i].throughput.train /= NUM_RANKS; + global_stats[i].throughput.eval /= NUM_RANKS; + global_stats[i].observed_time.train /= NUM_RANKS; + global_stats[i].observed_time.eval /= NUM_RANKS; + global_stats[i].metadata_time.train /= NUM_RANKS; + global_stats[i].metadata_time.eval /= NUM_RANKS; + global_stats[i].raw_read_time.train /= NUM_RANKS; + global_stats[i].raw_read_time.eval /= NUM_RANKS; + + // if (config.NUM_OF_ACTUALLY_USED_PROCESSES_TRAIN > 0) { + // global_stats[i].metadata_time.train /= config.NUM_OF_ACTUALLY_USED_PROCESSES_TRAIN; + // global_stats[i].raw_read_time.train /= config.NUM_OF_ACTUALLY_USED_PROCESSES_TRAIN; + // } + // if (config.NUM_OF_ACTUALLY_USED_PROCESSES_EVAL > 0) { + // global_stats[i].metadata_time.eval /= config.NUM_OF_ACTUALLY_USED_PROCESSES_EVAL; + // global_stats[i].raw_read_time.eval /= config.NUM_OF_ACTUALLY_USED_PROCESSES_EVAL; + // } + } +} + +// Preparing data obtained during benchmark execution for output +void +print_average_data() +{ + // Train + uint64_t train_total_size_bytes = + (uint64_t)config.BATCH_SIZE * + (config.TOTAL_TRAINING_STEPS == -1 ? config.NUM_TRAIN_BATCHES_PER_RANK * NUM_RANKS + : config.TOTAL_TRAINING_STEPS) * + config.RECORD_LENGTH; + uint64_t train_size_bytes_per_rank = train_total_size_bytes / NUM_RANKS; + + uint64_t train_total_compute_time = 0; + uint64_t *train_compute_time_per_epoch = (uint64_t *)malloc(config.EPOCHS * sizeof(uint64_t)); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + unsigned long int compute_time = 0; + for (uint32_t j = 0; j < config.NUM_TRAIN_BATCHES_PER_RANK; j++) { + compute_time += global_stats[i].compute.train[j]; + } + train_total_compute_time += compute_time; + train_compute_time_per_epoch[i] = compute_time; + } + + uint64_t train_total_metadata_time = 0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + train_total_metadata_time += global_stats[i].metadata_time.train; + } + + uint64_t train_total_read_time = 0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + train_total_read_time += global_stats[i].raw_read_time.train; + } + + double train_total_avg_read_rate = 0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + if (global_stats[i].raw_read_time.train == 0) { + continue; + } + train_total_avg_read_rate += + (double)train_size_bytes_per_rank / global_stats[i].raw_read_time.train * 1000000.0; + } + train_total_avg_read_rate /= config.EPOCHS; + + uint64_t train_total_observed_time = 0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + train_total_observed_time += global_stats[i].observed_time.train; + } + + double train_total_avg_observed_rate = 0.0; + double *train_avg_observed_rate_per_epoch = (double *)malloc(config.EPOCHS * sizeof(double)); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + unsigned long int compute_time = 0; + for (uint32_t j = 0; j < config.NUM_TRAIN_BATCHES_PER_RANK; j++) { + compute_time += global_stats[i].compute.train[j]; + } + if ((global_stats[i].observed_time.train - compute_time) == 0) { + train_avg_observed_rate_per_epoch[i] = NAN; + continue; + } + train_avg_observed_rate_per_epoch[i] = (double)train_size_bytes_per_rank / + (global_stats[i].observed_time.train - compute_time) * + 1000000.0; + train_total_avg_observed_rate += train_avg_observed_rate_per_epoch[i]; + } + train_total_avg_observed_rate /= config.EPOCHS; + + double train_throughput_mean_samples_per_second = 0.0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + train_throughput_mean_samples_per_second += global_stats[i].throughput.train; + } + train_throughput_mean_samples_per_second = + train_throughput_mean_samples_per_second / (double)config.EPOCHS; + + double train_throughput_stdev_samples_per_second = 0.0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + train_throughput_stdev_samples_per_second += + (global_stats[i].throughput.train - train_throughput_mean_samples_per_second) * + (global_stats[i].throughput.train - train_throughput_mean_samples_per_second); + } + train_throughput_stdev_samples_per_second = + sqrt(train_throughput_stdev_samples_per_second / (double)config.EPOCHS); + + double train_io_mean = train_throughput_mean_samples_per_second * config.RECORD_LENGTH; + + double train_io_stdev = train_throughput_stdev_samples_per_second * config.RECORD_LENGTH; + + // Evaluation + uint64_t eval_total_size_bytes = (uint64_t)config.NUM_EVAL_BATCHES_PER_RANK * NUM_RANKS * + config.BATCH_SIZE_EVAL * config.RECORD_LENGTH; + uint64_t eval_size_bytes_per_rank = + (uint64_t)config.NUM_EVAL_BATCHES_PER_RANK * config.BATCH_SIZE_EVAL * config.RECORD_LENGTH; + + uint64_t eval_total_compute_time = 0; + uint64_t *eval_compute_time_per_epoch = (uint64_t *)malloc(config.EPOCHS * sizeof(uint64_t)); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + unsigned long int compute_time = 0; + for (uint32_t j = 0; j < config.NUM_EVAL_BATCHES_PER_RANK; j++) { + compute_time += global_stats[i].compute.eval[j]; + } + eval_compute_time_per_epoch[i] = compute_time; + eval_total_compute_time += compute_time; + } + + uint64_t eval_total_metadata_time = 0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + eval_total_metadata_time += global_stats[i].metadata_time.eval; + } + + uint64_t eval_total_read_time = 0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + eval_total_read_time += global_stats[i].raw_read_time.eval; + } + + double eval_total_avg_read_rate = 0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + if (global_stats[i].raw_read_time.eval == 0) { + continue; + } + eval_total_avg_read_rate += + (double)eval_size_bytes_per_rank / global_stats[i].raw_read_time.eval * 1000000.0; + } + eval_total_avg_read_rate /= config.EPOCHS; + + uint64_t eval_total_observed_time = 0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + eval_total_observed_time += global_stats[i].observed_time.eval; + } + + double eval_total_avg_observed_rate = 0.0; + double *eval_avg_observed_rate_per_epoch = (double *)malloc(config.EPOCHS * sizeof(double)); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + unsigned long compute_time = 0; + for (uint32_t j = 0; j < config.NUM_EVAL_BATCHES_PER_RANK; j++) { + compute_time += global_stats[i].compute.eval[j]; + } + if ((global_stats[i].observed_time.eval - compute_time) == 0) { + eval_avg_observed_rate_per_epoch[i] = NAN; + continue; + } + eval_avg_observed_rate_per_epoch[i] = (double)eval_size_bytes_per_rank / + (global_stats[i].observed_time.eval - compute_time) * 1000000.0; + eval_total_avg_observed_rate += eval_avg_observed_rate_per_epoch[i]; + } + eval_total_avg_observed_rate /= config.EPOCHS; + + double eval_throughput_mean_samples_per_second = 0.0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + eval_throughput_mean_samples_per_second += global_stats[i].throughput.eval; + } + eval_throughput_mean_samples_per_second = eval_throughput_mean_samples_per_second / (double)config.EPOCHS; + + double eval_throughput_stdev_samples_per_second = 0.0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + eval_throughput_stdev_samples_per_second += + (global_stats[i].throughput.eval - eval_throughput_mean_samples_per_second) * + (global_stats[i].throughput.eval - eval_throughput_mean_samples_per_second); + } + eval_throughput_stdev_samples_per_second = + sqrt(eval_throughput_stdev_samples_per_second / (double)config.EPOCHS); + + double eval_io_mean = eval_throughput_mean_samples_per_second * config.RECORD_LENGTH; + + double eval_io_stdev = eval_throughput_stdev_samples_per_second * config.RECORD_LENGTH; + + human_readable value; + + printf("\n=================== Performance Results ==================\n"); + printf("Total number of ranks: %d\n", NUM_RANKS); + printf("The number of read threads per rank: %d\n", config.READ_THREADS); + + value = format_human_readable(train_total_size_bytes); + printf("Total training set size: %.3lf %cB\n", value.value, value.unit); + value = format_human_readable(train_size_bytes_per_rank); + printf("Training set size per rank: %.3lf %cB\n", value.value, value.unit); + printf("Total training emulated compute time: %.3lf s\n", train_total_compute_time / 1000000.0); + printf("Training metadata time: %.3lf s\n", train_total_metadata_time / 1000000.0); + printf("Training raw read time: %.3lf s\n", train_total_read_time / 1000000.0); + value = format_human_readable(train_total_avg_read_rate); + printf("Training average raw read rate: %.3f %cB/s\n", value.value, value.unit); + printf("Observed training completion time: %.3lf s\n", train_total_observed_time / 1000000.0); + value = format_human_readable(train_total_avg_observed_rate); + printf("Observed average training rate: %.3f %cB/s\n", value.value, value.unit); + printf("Training average throughput: %.3lf samples/s\n", train_throughput_mean_samples_per_second); + printf("Training throughput standard deviation: %.3lf samples/s\n", + train_throughput_stdev_samples_per_second); + value = format_human_readable(train_io_mean); + printf("Training average IO: %.3f %cB/s\n", value.value, value.unit); + value = format_human_readable(train_io_stdev); + printf("Training IO standard deviation: %.3f %cB/s\n", value.value, value.unit); + + if (config.DO_EVALUATION) { + value = format_human_readable(eval_total_size_bytes); + printf("Total evaluation set size: %.3lf %cB\n", value.value, value.unit); + value = format_human_readable(eval_size_bytes_per_rank); + printf("Evaluation set size per rank: %.3lf %cB\n", value.value, value.unit); + printf("Total evaluation emulated compute time: %.3lf s\n", eval_total_compute_time / 1000000.0); + printf("Evaluation metadata time: %.3lf s\n", eval_total_metadata_time / 1000000.0); + printf("Evaluation raw read time: %.3lf s\n", eval_total_read_time / 1000000.0); + value = format_human_readable(eval_total_avg_read_rate); + printf("Evaluation average raw read rate: %.3lf %cB/s\n", value.value, value.unit); + printf("Observed evaluation completion time: %.3lf s\n", eval_total_observed_time / 1000000.0); + value = format_human_readable(eval_total_avg_observed_rate); + printf("Observed average evaluation rate: %.3lf %cB/s\n", value.value, value.unit); + printf("Evaluation average throughput avg: %.3lf samples/s\n", + eval_throughput_mean_samples_per_second); + printf("Evaluation throughput standard deviation: %.3lf samples/s\n", + eval_throughput_stdev_samples_per_second); + value = format_human_readable(eval_io_mean); + printf("Evaluation average IO: %.3lf %cB/s\n", value.value, value.unit); + value = format_human_readable(eval_io_stdev); + printf("Evaluation IO standard deviation: %.3lf %cB/s\n", value.value, value.unit); + } + + printf("===========================================================\n"); + + char file_name[256]; + snprintf(file_name, sizeof(file_name), "%s/%s.csv", config.OUTPUT_DATA_FOLDER, config.OUTPUT_CSV_NAME); + + FILE *csv_file = fopen(file_name, "w+"); + + char *units = (char *)malloc(config.EPOCHS * sizeof(char)); + + fprintf(csv_file, "metric, value, unit\n"); + fprintf(csv_file, "operation, dlio,\n"); + fprintf(csv_file, "ranks, %d,\n", NUM_RANKS); + fprintf(csv_file, "read threads, %d,\n", config.READ_THREADS); + fprintf(csv_file, "subfiling, %s,\n", config.SUBFILING ? "YES" : "NO"); + fprintf(csv_file, "chunking, %s,\n", config.DO_CHUNKING ? "YES" : "NO"); + fprintf(csv_file, "collective meta, %s,\n", config.COLLECTIVE_META ? "YES" : "NO"); + fprintf(csv_file, "collective data, %s,\n", config.COLLECTIVE_DATA ? "YES" : "NO"); + + value = format_human_readable(train_total_size_bytes); + fprintf(csv_file, "train total size, %.3lf, %cB\n", value.value, value.unit); + value = format_human_readable(train_size_bytes_per_rank); + fprintf(csv_file, "train size per rank, %.3lf, %cB\n", value.value, value.unit); + fprintf(csv_file, "train emulated compute time per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%.3lf", train_compute_time_per_epoch[i] / 1000000.0); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "s"); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\"\ntrain emulated compute time, %.3lf, s\n", train_total_compute_time / 1000000.0); + fprintf(csv_file, "train metadata time per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%.3lf", global_stats[i].metadata_time.train / 1000000.0); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "s"); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\"\ntrain metadata time, %.3lf, s\n", train_total_metadata_time / 1000000.0); + fprintf(csv_file, "train raw read time per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%.3lf", global_stats[i].raw_read_time.train / 1000000.0); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "s"); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\"\ntrain total raw read time, %.3lf, s\n", train_total_read_time / 1000000.0); + fprintf(csv_file, "train raw read rate per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + if (global_stats[i].raw_read_time.train == 0) { + units[i] = ' '; + fprintf(csv_file, "NaN"); + } + else { + value = format_human_readable((double)train_size_bytes_per_rank / + global_stats[i].raw_read_time.train * 1000000.0); + units[i] = value.unit; + fprintf(csv_file, "%.3lf", value.value); + } + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%cB/s", units[i]); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + value = format_human_readable(train_total_avg_read_rate); + fprintf(csv_file, "\"\ntrain avg raw read rate, %.3lf, %cB/s\n", value.value, value.unit); + fprintf(csv_file, "train observed time per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%.3lf", global_stats[i].observed_time.train / 1000000.0); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "s"); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\"\ntrain observed time, %.3lf, s\n", train_total_observed_time / 1000000.0); + fprintf(csv_file, "train observed rate per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + value = format_human_readable(train_avg_observed_rate_per_epoch[i]); + units[i] = value.unit; + fprintf(csv_file, "%.3lf", value.value); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%cB/s", units[i]); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + value = format_human_readable(train_total_avg_observed_rate); + fprintf(csv_file, "\"\ntrain avg observed rate, %.3lf, %cB/s\n", value.value, value.unit); + fprintf(csv_file, "train throughput samples per second per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%.3lf", global_stats[i].throughput.train); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "samples/s"); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\"\ntrain throughput avg samples per second, %.3lf, samples/s\n", + train_throughput_mean_samples_per_second); + fprintf(csv_file, "train throughput stdev samples per second, %.3lf, samples/s\n", + train_throughput_stdev_samples_per_second); + value = format_human_readable(train_io_mean); + fprintf(csv_file, "train io avg, %.3lf, %cB/s\n", value.value, value.unit); + value = format_human_readable(train_io_stdev); + fprintf(csv_file, "train io stdev, %.3lf, %cB/s\n", value.value, value.unit); + + if (config.DO_EVALUATION) { + value = format_human_readable(eval_total_size_bytes); + fprintf(csv_file, "eval total size, %.3lf, %cB\n", value.value, value.unit); + value = format_human_readable(eval_size_bytes_per_rank); + fprintf(csv_file, "eval size per rank, %.3lf, %cB\n", value.value, value.unit); + fprintf(csv_file, "eval emulated compute time per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%.3lf", eval_compute_time_per_epoch[i] / 1000000.0); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "s"); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\"\neval emulated compute time, %.3lf, s\n", eval_total_compute_time / 1000000.0); + fprintf(csv_file, "eval metadata time per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%.3lf", global_stats[i].metadata_time.eval / 1000000.0); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "s"); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\"\neval metadata time, %.3lf, s\n", eval_total_metadata_time / 1000000.0); + fprintf(csv_file, "eval raw read time per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%.3lf", global_stats[i].raw_read_time.eval / 1000000.0); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "s"); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + + fprintf(csv_file, "\"\neval total raw read time, %.3lf, s\n", eval_total_read_time / 1000000.0); + fprintf(csv_file, "eval raw read rate per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + if (global_stats[i].raw_read_time.eval == 0) { + units[i] = ' '; + fprintf(csv_file, "NaN"); + } + else { + value = format_human_readable(eval_size_bytes_per_rank / global_stats[i].raw_read_time.eval * + 1000000.0); + units[i] = value.unit; + fprintf(csv_file, "%.3lf", value.value); + } + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%cB/s", units[i]); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + value = format_human_readable(eval_total_avg_read_rate); + fprintf(csv_file, "\"\neval avg raw read rate, %.3lf, %cB/s\n", value.value, value.unit); + fprintf(csv_file, "eval observed time per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%.3lf", global_stats[i].observed_time.eval / 1000000.0); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "s"); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\"\neval observed time, %.3lf, s\n", eval_total_observed_time / 1000000.0); + fprintf(csv_file, "eval observed rate per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + value = format_human_readable(eval_avg_observed_rate_per_epoch[i]); + units[i] = value.unit; + fprintf(csv_file, "%.3lf", value.value); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%cB/s", units[i]); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + value = format_human_readable(eval_total_avg_observed_rate); + fprintf(csv_file, "\"\neval avg observed rate, %.3lf, %cB/s\n", value.value, value.unit); + fprintf(csv_file, "eval throughput samples per second per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%.3lf", global_stats[i].throughput.eval); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "samples/s"); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\"\neval throughput avg samples per second, %.3lf, samples/s\n", + eval_throughput_mean_samples_per_second); + fprintf(csv_file, "eval throughput stdev samples per second, %.3lf, samples/s\n", + eval_throughput_stdev_samples_per_second); + value = format_human_readable(eval_io_mean); + fprintf(csv_file, "eval io avg, %.3lf, %cB/s\n", value.value, value.unit); + value = format_human_readable(eval_io_stdev); + fprintf(csv_file, "eval io stdev, %.3lf, %cB/s\n", value.value, value.unit); + } + + fclose(csv_file); + free(units); + free(train_compute_time_per_epoch); + free(eval_compute_time_per_epoch); + free(train_avg_observed_rate_per_epoch); + free(eval_avg_observed_rate_per_epoch); +} + +// Output collected statistics on the current MPI rank +void +print_rank_data() +{ + // Train + uint64_t train_total_size_bytes = + (uint64_t)config.NUM_TRAIN_BATCHES_PER_RANK * NUM_RANKS * config.BATCH_SIZE * config.RECORD_LENGTH; + uint64_t train_size_bytes_per_rank = + (uint64_t)config.NUM_TRAIN_BATCHES_PER_RANK * config.BATCH_SIZE * config.RECORD_LENGTH; + + uint64_t train_total_compute_time = 0; + uint64_t *train_compute_time_per_epoch = (uint64_t *)malloc(config.EPOCHS * sizeof(uint64_t)); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + unsigned long int compute_time = 0; + for (uint32_t j = 0; j < config.NUM_TRAIN_BATCHES_PER_RANK; j++) { + compute_time += stats[i].compute.train[j]; + } + train_total_compute_time += compute_time; + train_compute_time_per_epoch[i] = compute_time; + } + + uint64_t train_total_metadata_time = 0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + train_total_metadata_time += stats[i].metadata_time.train; + } + + uint64_t train_total_read_time = 0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + train_total_read_time += stats[i].raw_read_time.train; + } + + double train_total_avg_read_rate = 0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + if (stats[i].raw_read_time.train == 0) { + continue; + } + train_total_avg_read_rate += + (double)train_size_bytes_per_rank / stats[i].raw_read_time.train * 1000000.0; + } + train_total_avg_read_rate /= config.EPOCHS; + + uint64_t train_total_observed_time = 0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + train_total_observed_time += stats[i].observed_time.train; + } + + double train_total_avg_observed_rate = 0.0; + double *train_avg_observed_rate_per_epoch = (double *)malloc(config.EPOCHS * sizeof(double)); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + unsigned long int compute_time = 0; + for (uint32_t j = 0; j < config.NUM_TRAIN_BATCHES_PER_RANK; j++) { + compute_time += stats[i].compute.train[j]; + } + if ((stats[i].observed_time.train - compute_time) == 0) { + train_avg_observed_rate_per_epoch[i] = NAN; + continue; + } + train_avg_observed_rate_per_epoch[i] = + (double)train_size_bytes_per_rank / (stats[i].observed_time.train - compute_time) * 1000000.0; + train_total_avg_observed_rate += train_avg_observed_rate_per_epoch[i]; + } + train_total_avg_observed_rate /= config.EPOCHS; + + double train_throughput_mean_samples_per_second = 0.0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + train_throughput_mean_samples_per_second += stats[i].throughput.train; + } + train_throughput_mean_samples_per_second = + train_throughput_mean_samples_per_second / (double)config.EPOCHS; + + double train_throughput_stdev_samples_per_second = 0.0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + train_throughput_stdev_samples_per_second += + (stats[i].throughput.train - train_throughput_mean_samples_per_second) * + (stats[i].throughput.train - train_throughput_mean_samples_per_second); + } + train_throughput_stdev_samples_per_second = + sqrt(train_throughput_stdev_samples_per_second / (double)config.EPOCHS); + + double train_io_mean = train_throughput_mean_samples_per_second * config.RECORD_LENGTH; + + double train_io_stdev = train_throughput_stdev_samples_per_second * config.RECORD_LENGTH; + + // Evaluation + uint64_t eval_total_size_bytes = (uint64_t)config.NUM_EVAL_BATCHES_PER_RANK * NUM_RANKS * + config.BATCH_SIZE_EVAL * config.RECORD_LENGTH; + uint64_t eval_size_bytes_per_rank = + (uint64_t)config.NUM_EVAL_BATCHES_PER_RANK * config.BATCH_SIZE_EVAL * config.RECORD_LENGTH; + + uint64_t eval_total_compute_time = 0; + uint64_t *eval_compute_time_per_epoch = (uint64_t *)malloc(config.EPOCHS * sizeof(uint64_t)); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + unsigned long int compute_time = 0; + for (uint32_t j = 0; j < config.NUM_EVAL_BATCHES_PER_RANK; j++) { + compute_time += stats[i].compute.eval[j]; + } + eval_compute_time_per_epoch[i] = compute_time; + eval_total_compute_time += compute_time; + } + + uint64_t eval_total_metadata_time = 0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + eval_total_metadata_time += stats[i].metadata_time.eval; + } + + uint64_t eval_total_read_time = 0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + eval_total_read_time += stats[i].raw_read_time.eval; + } + + double eval_total_avg_read_rate = 0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + if (stats[i].raw_read_time.eval == 0) { + continue; + } + eval_total_avg_read_rate += + (double)eval_size_bytes_per_rank / stats[i].raw_read_time.eval * 1000000.0; + } + eval_total_avg_read_rate /= config.EPOCHS; + + uint64_t eval_total_observed_time = 0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + eval_total_observed_time += stats[i].observed_time.eval; + } + + double eval_total_avg_observed_rate = 0.0; + double *eval_avg_observed_rate_per_epoch = (double *)malloc(config.EPOCHS * sizeof(double)); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + unsigned long compute_time = 0; + for (uint32_t j = 0; j < config.NUM_EVAL_BATCHES_PER_RANK; j++) { + compute_time += stats[i].compute.eval[j]; + } + if ((stats[i].observed_time.eval - compute_time) == 0) { + eval_avg_observed_rate_per_epoch[i] = NAN; + continue; + } + eval_avg_observed_rate_per_epoch[i] = + (double)eval_size_bytes_per_rank / (stats[i].observed_time.eval - compute_time) * 1000000.0; + eval_total_avg_observed_rate += eval_avg_observed_rate_per_epoch[i]; + } + eval_total_avg_observed_rate /= config.EPOCHS; + + double eval_throughput_mean_samples_per_second = 0.0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + eval_throughput_mean_samples_per_second += stats[i].throughput.eval; + } + eval_throughput_mean_samples_per_second = eval_throughput_mean_samples_per_second / (double)config.EPOCHS; + + double eval_throughput_stdev_samples_per_second = 0.0; + for (uint32_t i = 0; i < config.EPOCHS; i++) { + eval_throughput_stdev_samples_per_second += + (stats[i].throughput.eval - eval_throughput_mean_samples_per_second) * + (stats[i].throughput.eval - eval_throughput_mean_samples_per_second); + } + eval_throughput_stdev_samples_per_second = + sqrt(eval_throughput_stdev_samples_per_second / (double)config.EPOCHS); + + double eval_io_mean = eval_throughput_mean_samples_per_second * config.RECORD_LENGTH; + + double eval_io_stdev = eval_throughput_stdev_samples_per_second * config.RECORD_LENGTH; + + human_readable value; + + char filename[256]; + snprintf(filename, sizeof(filename), "%s/%d_%s.csv", config.OUTPUT_DATA_FOLDER, MY_RANK, + config.OUTPUT_CSV_NAME); + FILE *csv_file = fopen(filename, "w+"); + + char *units = (char *)malloc(config.EPOCHS * sizeof(char)); + + fprintf(csv_file, "metric, value, unit\n"); + fprintf(csv_file, "operation, dlio,\n"); + fprintf(csv_file, "ranks, %d,\n", NUM_RANKS); + fprintf(csv_file, "read threads, %d,\n", config.READ_THREADS); + fprintf(csv_file, "subfiling, %s,\n", config.SUBFILING ? "YES" : "NO"); + fprintf(csv_file, "chunking, %s,\n", config.DO_CHUNKING ? "YES" : "NO"); + fprintf(csv_file, "collective meta, %s,\n", config.COLLECTIVE_META ? "YES" : "NO"); + fprintf(csv_file, "collective data, %s,\n", config.COLLECTIVE_DATA ? "YES" : "NO"); + + value = format_human_readable(train_total_size_bytes); + fprintf(csv_file, "train total size, %.3lf, %cB\n", value.value, value.unit); + value = format_human_readable(train_size_bytes_per_rank); + fprintf(csv_file, "train size per rank, %.3lf, %cB\n", value.value, value.unit); + fprintf(csv_file, "train emulated compute time per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%.3lf", train_compute_time_per_epoch[i] / 1000000.0); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "s"); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\"\ntrain emulated compute time, %.3lf, s\n", train_total_compute_time / 1000000.0); + fprintf(csv_file, "train metadata time per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%.3lf", stats[i].metadata_time.train / 1000000.0); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "s"); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\"\ntrain metadata time, %.3lf, s\n", train_total_metadata_time / 1000000.0); + fprintf(csv_file, "train raw read time per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%.3lf", stats[i].raw_read_time.train / 1000000.0); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "s"); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\"\ntrain total raw read time, %.3lf, s\n", train_total_read_time / 1000000.0); + fprintf(csv_file, "train raw read rate per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + if (stats[i].raw_read_time.train == 0) { + units[i] = ' '; + fprintf(csv_file, "NaN"); + } + else { + value = format_human_readable((double)train_size_bytes_per_rank / stats[i].raw_read_time.train * + 1000000.0); + units[i] = value.unit; + fprintf(csv_file, "%.3lf", value.value); + } + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%cB/s", units[i]); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + value = format_human_readable(train_total_avg_read_rate); + fprintf(csv_file, "\"\ntrain avg raw read rate, %.3lf, %cB/s\n", value.value, value.unit); + fprintf(csv_file, "train observed time per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%.3lf", stats[i].observed_time.train / 1000000.0); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "s"); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\"\ntrain observed time, %.3lf, s\n", train_total_observed_time / 1000000.0); + fprintf(csv_file, "train observed rate per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + value = format_human_readable(train_avg_observed_rate_per_epoch[i]); + units[i] = value.unit; + fprintf(csv_file, "%.3lf", value.value); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%cB/s", units[i]); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + value = format_human_readable(train_total_avg_observed_rate); + fprintf(csv_file, "\"\ntrain avg observed rate, %.3lf, %cB/s\n", value.value, value.unit); + fprintf(csv_file, "train throughput samples per second per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%.3lf", stats[i].throughput.train); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "samples/s"); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\"\ntrain throughput avg samples per second, %.3lf, samples/s\n", + train_throughput_mean_samples_per_second); + fprintf(csv_file, "train throughput stdev samples per second, %.3lf, samples/s\n", + train_throughput_stdev_samples_per_second); + value = format_human_readable(train_io_mean); + fprintf(csv_file, "train io avg, %.3lf, %cB/s\n", value.value, value.unit); + value = format_human_readable(train_io_stdev); + fprintf(csv_file, "train io stdev, %.3lf, %cB/s\n", value.value, value.unit); + + value = format_human_readable(eval_total_size_bytes); + fprintf(csv_file, "eval total size, %.3lf, %cB\n", value.value, value.unit); + value = format_human_readable(eval_size_bytes_per_rank); + fprintf(csv_file, "eval size per rank, %.3lf, %cB\n", value.value, value.unit); + fprintf(csv_file, "eval emulated compute time per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%.3lf", eval_compute_time_per_epoch[i] / 1000000.0); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "s"); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\"\neval emulated compute time, %.3lf, s\n", eval_total_compute_time / 1000000.0); + fprintf(csv_file, "eval metadata time per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%.3lf", stats[i].metadata_time.eval / 1000000.0); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "s"); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\"\neval metadata time, %.3lf, s\n", eval_total_metadata_time / 1000000.0); + fprintf(csv_file, "eval raw read time per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%.3lf", stats[i].raw_read_time.eval / 1000000.0); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "s"); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + + fprintf(csv_file, "\"\neval total raw read time, %.3lf, s\n", eval_total_read_time / 1000000.0); + fprintf(csv_file, "eval raw read rate per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + if (stats[i].raw_read_time.eval == 0) { + units[i] = ' '; + fprintf(csv_file, "NaN"); + } + else { + value = format_human_readable(eval_size_bytes_per_rank / stats[i].raw_read_time.eval * 1000000.0); + units[i] = value.unit; + fprintf(csv_file, "%.3lf", value.value); + } + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%cB/s", units[i]); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + value = format_human_readable(eval_total_avg_read_rate); + fprintf(csv_file, "\"\neval avg raw read rate, %.3lf, %cB/s\n", value.value, value.unit); + fprintf(csv_file, "eval observed time per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%.3lf", stats[i].observed_time.eval / 1000000.0); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "s"); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\"\neval observed time, %.3lf, s\n", eval_total_observed_time / 1000000.0); + fprintf(csv_file, "eval observed rate per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + value = format_human_readable(eval_avg_observed_rate_per_epoch[i]); + units[i] = value.unit; + fprintf(csv_file, "%.3lf", value.value); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%cB/s", units[i]); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + value = format_human_readable(eval_total_avg_observed_rate); + fprintf(csv_file, "\"\neval avg observed rate, %.3lf, %cB/s\n", value.value, value.unit); + fprintf(csv_file, "eval throughput samples per second per epoch, \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "%.3lf", stats[i].throughput.eval); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\", \""); + for (uint32_t i = 0; i < config.EPOCHS; i++) { + fprintf(csv_file, "samples/s"); + if (i != config.EPOCHS - 1) + fprintf(csv_file, ", "); + } + fprintf(csv_file, "\"\neval throughput avg samples per second, %.3lf, samples/s\n", + eval_throughput_mean_samples_per_second); + fprintf(csv_file, "eval throughput stdev samples per second, %.3lf, samples/s\n", + eval_throughput_stdev_samples_per_second); + value = format_human_readable(eval_io_mean); + fprintf(csv_file, "eval io avg, %.3lf, %cB/s\n", value.value, value.unit); + value = format_human_readable(eval_io_stdev); + fprintf(csv_file, "eval io stdev, %.3lf, %cB/s\n", value.value, value.unit); + + fclose(csv_file); + free(units); + free(train_compute_time_per_epoch); + free(eval_compute_time_per_epoch); + free(train_avg_observed_rate_per_epoch); + free(eval_avg_observed_rate_per_epoch); +} + +// Saving the time spent on loading a batch during the training process +void +batch_loaded_train(uint32_t epoch, uint64_t t0) +{ + stats[epoch].load.train[last_load_train[epoch]++] = (get_time_usec_return_uint64() - t0); +} + +// Saving the time spent on processing a batch during the trining process +void +batch_processed_train(uint32_t epoch, uint64_t computation_time, uint64_t t0) +{ + stats[epoch].proc.train[last_proc_train[epoch]++] = (get_time_usec_return_uint64() - t0); + stats[epoch].compute.train[last_compute_train[epoch]++] = computation_time; +} + +// Saving the time spent on loading a batch during the evaluation process +void +batch_loaded_eval(uint32_t epoch, uint64_t t0) +{ + stats[epoch].load.eval[last_load_eval[epoch]++] = (get_time_usec_return_uint64() - t0); +} + +// Saving the time spent on processing a batch during the evaluation process +void +batch_processed_eval(uint32_t epoch, uint64_t computation_time, uint64_t t0) +{ + stats[epoch].proc.eval[last_proc_eval[epoch]++] = (get_time_usec_return_uint64() - t0); + stats[epoch].compute.eval[last_compute_eval[epoch]++] = computation_time; +} + +// Saving the start time of the training process +void +start_train(uint32_t epoch) +{ + stats[epoch].start_time.train = get_time_usec_return_uint64(); +} + +// Saving data on the training process +void +end_train(uint32_t epoch, uint64_t metadata_time, uint64_t read_time) +{ + uint64_t end_time = get_time_usec_return_uint64(); + stats[epoch].observed_time.train = end_time - stats[epoch].start_time.train; + if ((end_time - stats[epoch].start_time.train) == 0) { + stats[epoch].throughput.train = NAN; + } + else { + stats[epoch].throughput.train = + (double)config.BATCH_SIZE * + (config.TOTAL_TRAINING_STEPS_PER_RANK == -1 ? config.NUM_TRAIN_BATCHES_PER_RANK + : config.TOTAL_TRAINING_STEPS_PER_RANK) * + 1000000.0 / (end_time - stats[epoch].start_time.train); + } + stats[epoch].metadata_time.train = metadata_time; + stats[epoch].raw_read_time.train = read_time; +} + +// Saving the start time of the evaluation process +void +start_eval(uint32_t epoch) +{ + stats[epoch].start_time.eval = get_time_usec_return_uint64(); +} + +// Saving data on the evaluation process +void +end_eval(uint32_t epoch, uint64_t metadata_time, uint64_t read_time) +{ + uint64_t end_time = get_time_usec_return_uint64(); + stats[epoch].observed_time.eval = end_time - stats[epoch].start_time.eval; + if ((end_time - stats[epoch].start_time.eval) == 0) { + stats[epoch].throughput.eval = NAN; + } + else { + stats[epoch].throughput.eval = (double)config.NUM_EVAL_BATCHES_PER_RANK * config.BATCH_SIZE_EVAL * + 1000000.0 / (end_time - stats[epoch].start_time.eval); + } + stats[epoch].metadata_time.eval = metadata_time; + stats[epoch].raw_read_time.eval = read_time; +} diff --git a/dlio/stats.h b/dlio/stats.h new file mode 100644 index 00000000..ad55b92c --- /dev/null +++ b/dlio/stats.h @@ -0,0 +1,84 @@ +#ifndef SANDBOX_STATS_H +#define SANDBOX_STATS_H + +#include +#include + +struct load_data { + uint64_t *train; + uint64_t *eval; +}; + +struct proc_data { + uint64_t *train; + uint64_t *eval; +}; + +struct throughput_data { + double train; + double eval; +}; + +struct compute_data { + uint64_t *train; + uint64_t *eval; +}; + +struct start_time_data { + uint64_t train; + uint64_t eval; +}; + +struct observed_time_data { + uint64_t train; + uint64_t eval; +}; + +struct metadata_time_data { + uint64_t train; + uint64_t eval; +}; + +struct raw_read_time_data { + uint64_t train; + uint64_t eval; +}; + +typedef struct epoch_data { + struct start_time_data start_time; + struct load_data load; + struct proc_data proc; + struct throughput_data throughput; + struct compute_data compute; + struct observed_time_data observed_time; + struct metadata_time_data metadata_time; + struct raw_read_time_data raw_read_time; +} epoch_data_t; + +void stats_initialize(); + +void stats_finalize(); + +void prepare_data(); + +void print_average_data(); + +void print_rank_data(); + +void batch_loaded_train(uint32_t epoch, uint64_t start_time); + +void batch_processed_train(uint32_t epoch, uint64_t computation_time, uint64_t start_time); + +void batch_loaded_eval(uint32_t epoch, uint64_t t0); + +void batch_processed_eval(uint32_t epoch, uint64_t computation_time, uint64_t t0); + +void start_train(uint32_t epoch); + +void end_train(uint32_t epoch, uint64_t metadata_time, uint64_t read_time); + +void start_eval(uint32_t epoch); + +void end_eval(uint32_t epoch, uint64_t metadata_time, uint64_t read_time); + +#endif // SANDBOX_STATS_H diff --git a/dlio/utils.c b/dlio/utils.c new file mode 100644 index 00000000..e911f042 --- /dev/null +++ b/dlio/utils.c @@ -0,0 +1,323 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "utils.h" + +// Returns the current time in microseconds +uint64_t +get_time_usec_return_uint64() +{ + struct timeval tv; + gettimeofday(&tv, NULL); + return (uint64_t)1000000 * tv.tv_sec + tv.tv_usec; +} + +config_datatype_t config = { + // Workflow + .DO_DATA_GENERATION = false, + .DO_TRAIN = false, + .DO_EVALUATION = false, + + // Dataset + .RECORD_LENGTH = 67108864, // should be a square number + // .RECORD_LENGTH_STDEV = 0.0f, + // .RECORD_LENGTH_RESIZE = 0.0f, + .NUM_FILES_TRAIN = 32, + .NUM_FILES_EVAL = 8, + .NUM_SAMPLES_PER_FILE = 4, + .DATA_FOLDER = "./data", + // .NUM_SUBFOLDERS_TRAIN = 0, + // .NUM_SUBFOLDERS_EVAL = 0, + .FILE_PREFIX = "img", + .DO_COMPRESSION = false, + .COMPRESSION_LEVEL = 4, + .DO_CHUNKING = false, + .CHUNK_SIZE = 1024, // should be greater than 120 on CLAIX23 + .KEEP_FILES = false, + .COLLECTIVE_META = false, + .COLLECTIVE_DATA = false, + .SUBFILING = false, + + // Reader + .BATCH_SIZE = 7, + .BATCH_SIZE_EVAL = 2, + .READ_THREADS = 4, + // .PREFETCH_SIZE = 0, + .DO_SHUFFLE = false, // sample shuffle vs file_shuffle + // .TRANSFER_SIZE = 262144, + .PREPROCESS_TIME = 0.0f, + .PREPROCESS_TIME_STDEV = 0.000f, + .DROP_LAST = true, + + // Train + .EPOCHS = 5, + .COMPUTATION_TIME = 0.323f, + .COMPUTATION_TIME_STDEV = 0.000f, + .TOTAL_TRAINING_STEPS = -1, + .TOTAL_TRAINING_STEPS_PER_RANK = -1, + .SEED_CHANGE_EPOCH = false, + .RANDOM_SEED = 42, + + // Evaluation + .EVAL_TIME = 0.323f, + .EVAL_TIME_STDEV = 0.000f, + .EPOCHS_BETWEEN_EVALS = 1, + + // Output + .TRAIN_DATA_FOLDER = "train", + .VALID_DATA_FOLDER = "valid", + .RECORDS_DATASET_NAME = "records", + .LABELS_DATASET_NAME = "labels", + .OUTPUT_DATA_FOLDER = "results", + .OUTPUT_CSV_NAME = "output", + .OUTPUT_RANKS_DATA = false, + + // Internal + .NUM_TRAIN_BATCHES_PER_RANK = 0, + .NUM_EVAL_BATCHES_PER_RANK = 0, + .NUM_OF_ACTUALLY_USED_PROCESSES_TRAIN = 0, + .NUM_OF_ACTUALLY_USED_PROCESSES_EVAL = 0, +}; + +// Creating a directory with a specified name +void +create_directory(const char *folder) +{ + struct stat st = {0}; + if (stat(folder, &st) == -1) { + if (mkdir(folder, 0700) != 0) { + perror("Failed to create directory"); + MPI_Abort(MPI_COMM_WORLD, 1); + } + } +} + +// Deleting a directory with a specified name +void +delete_directory(const char *dir_path) +{ + struct dirent *entry; + DIR * dir = opendir(dir_path); + + if (dir == NULL) { + perror("Error opening directory"); + return; + } + + while ((entry = readdir(dir)) != NULL) { + char path[1024]; + snprintf(path, sizeof(path), "%s/%s", dir_path, entry->d_name); + + if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) { + continue; + } + + struct stat statbuf; + if (stat(path, &statbuf) == 0) { + if (S_ISDIR(statbuf.st_mode)) { + delete_directory(path); + } + else { + if (remove(path) != 0) { + perror("Error deleting file"); + } + } + } + } + + closedir(dir); + + if (rmdir(dir_path) != 0) { + perror("Error deleting directory"); + } +} + +// Shuffle the values in the specified array +void +shuffle(uint32_t *array, size_t n) +{ + if (n > 1 && array != NULL) { + for (size_t i = n - 1; i > 0; i--) { + size_t j = rand() % (i + 1); + uint32_t temp = array[i]; + array[i] = array[j]; + array[j] = temp; + } + } +} + +// Generation of normally distributed random number +double +generate_normal_random(float mean, float stdev) +{ + double u1 = (double)rand() / RAND_MAX; + double u2 = (double)rand() / RAND_MAX; + double z0 = sqrt(-2.0 * log(u1)) * cos(2.0 * M_PI * u2); + return z0 * stdev + mean; +} + +// Parsing of arguments that the program receives as input +void +parse_args(int argc, char *argv[]) +{ + for (uint32_t i = 1; i < argc; i++) { + if (strcmp(argv[i], "--generate-data") == 0) { + config.DO_DATA_GENERATION = true; + } + else if (strcmp(argv[i], "--train") == 0) { + config.DO_TRAIN = true; + } + else if (strcmp(argv[i], "--evaluation") == 0) { + config.DO_EVALUATION = true; + } + else if (strcmp(argv[i], "--record-length") == 0) { + i++; + config.RECORD_LENGTH = atoi(argv[i]); + } + else if (strcmp(argv[i], "--num-files-train") == 0) { + i++; + config.NUM_FILES_TRAIN = atoi(argv[i]); + } + else if (strcmp(argv[i], "--num-files-eval") == 0) { + i++; + config.NUM_FILES_EVAL = atoi(argv[i]); + } + else if (strcmp(argv[i], "--num-samples-per-file") == 0) { + i++; + config.NUM_SAMPLES_PER_FILE = atoi(argv[i]); + } + else if (strcmp(argv[i], "--data-folder") == 0) { + i++; + config.DATA_FOLDER = argv[i]; + } + else if (strcmp(argv[i], "--file-prefix") == 0) { + i++; + config.FILE_PREFIX = argv[i]; + } + else if (strcmp(argv[i], "--chunking") == 0) { + config.DO_CHUNKING = true; + } + else if (strcmp(argv[i], "--chunk-size") == 0) { + i++; + config.CHUNK_SIZE = atoi(argv[i]); + } + else if (strcmp(argv[i], "--keep-files") == 0) { + config.KEEP_FILES = true; + } + else if (strcmp(argv[i], "--compression") == 0) { + config.DO_COMPRESSION = true; + } + else if (strcmp(argv[i], "--compression-level") == 0) { + i++; + config.COMPRESSION_LEVEL = atoi(argv[i]); + } + else if (strcmp(argv[i], "--batch-size") == 0) { + i++; + config.BATCH_SIZE = atoi(argv[i]); + } + else if (strcmp(argv[i], "--batch-size-eval") == 0) { + i++; + config.BATCH_SIZE_EVAL = atoi(argv[i]); + } + else if (strcmp(argv[i], "--shuffle") == 0) { + config.DO_SHUFFLE = true; + } + else if (strcmp(argv[i], "--preprocess-time") == 0) { + i++; + config.PREPROCESS_TIME = atof(argv[i]); + } + else if (strcmp(argv[i], "--preprocess-time-stdev") == 0) { + i++; + config.PREPROCESS_TIME_STDEV = atof(argv[i]); + } + else if (strcmp(argv[i], "--epochs") == 0) { + i++; + config.EPOCHS = atoi(argv[i]); + } + else if (strcmp(argv[i], "--computation-time") == 0) { + i++; + config.COMPUTATION_TIME = atof(argv[i]); + } + else if (strcmp(argv[i], "--computation-time-stdev") == 0) { + i++; + config.COMPUTATION_TIME_STDEV = atof(argv[i]); + } + else if (strcmp(argv[i], "--random-seed") == 0) { + i++; + config.RANDOM_SEED = atoi(argv[i]); + } + else if (strcmp(argv[i], "--eval-time") == 0) { + i++; + config.EVAL_TIME = atof(argv[i]); + } + else if (strcmp(argv[i], "--eval-time-stdev") == 0) { + i++; + config.EVAL_TIME_STDEV = atof(argv[i]); + } + else if (strcmp(argv[i], "--epochs-between-evals") == 0) { + i++; + config.EPOCHS_BETWEEN_EVALS = atoi(argv[i]); + } + else if (strcmp(argv[i], "--train-data-folder") == 0) { + i++; + config.TRAIN_DATA_FOLDER = argv[i]; + } + else if (strcmp(argv[i], "--valid-data-folder") == 0) { + i++; + config.VALID_DATA_FOLDER = argv[i]; + } + else if (strcmp(argv[i], "--records-dataset-name") == 0) { + i++; + config.RECORDS_DATASET_NAME = argv[i]; + } + else if (strcmp(argv[i], "--labels-dataset-name") == 0) { + i++; + config.LABELS_DATASET_NAME = argv[i]; + } + else if (strcmp(argv[i], "--seed-change-epoch") == 0) { + config.SEED_CHANGE_EPOCH = true; + } + else if (strcmp(argv[i], "--read-threads") == 0) { + i++; + config.READ_THREADS = atoi(argv[i]); + } + else if (strcmp(argv[i], "--collective-meta") == 0) { + config.COLLECTIVE_META = true; + } + else if (strcmp(argv[i], "--collective-data") == 0) { + config.COLLECTIVE_DATA = true; + } + else if (strcmp(argv[i], "--subfiling") == 0) { + config.SUBFILING = true; + } + else if (strcmp(argv[i], "--drop-last") == 0) { + config.DROP_LAST = true; + } + else if (strcmp(argv[i], "--output-data-folder") == 0) { + i++; + config.OUTPUT_DATA_FOLDER = argv[i]; + } + else if (strcmp(argv[i], "--output-csv-name") == 0) { + i++; + config.OUTPUT_CSV_NAME = argv[i]; + } + else if (strcmp(argv[i], "--output-ranks-data") == 0) { + config.OUTPUT_RANKS_DATA = true; + } + else if (strcmp(argv[i], "--total-training-steps") == 0) { + i++; + config.TOTAL_TRAINING_STEPS = atoi(argv[i]); + } + else { + printf("WARNING: %s not found\n", argv[i]); + } + } +} \ No newline at end of file diff --git a/dlio/utils.h b/dlio/utils.h new file mode 100644 index 00000000..56287f9b --- /dev/null +++ b/dlio/utils.h @@ -0,0 +1,91 @@ +#ifndef SANDBOX_UTILS_H +#define SANDBOX_UTILS_H + +#include +#include +#include + +uint64_t get_time_usec_return_uint64(); + +typedef struct config_datatype { + // Workflow + bool DO_DATA_GENERATION; + bool DO_TRAIN; + bool DO_EVALUATION; + + // Dataset + uint32_t RECORD_LENGTH; // should be a square number + // float RECORD_LENGTH_STDEV; + // float RECORD_LENGTH_RESIZE; + uint32_t NUM_FILES_TRAIN; + uint32_t NUM_FILES_EVAL; + uint32_t NUM_SAMPLES_PER_FILE; + char * DATA_FOLDER; + // unsigned int NUM_SUBFOLDERS_TRAIN; + // unsigned int NUM_SUBFOLDERS_EVAL; + char * FILE_PREFIX; + bool DO_COMPRESSION; + uint32_t COMPRESSION_LEVEL; + bool DO_CHUNKING; + uint32_t CHUNK_SIZE; // should be a square number + bool KEEP_FILES; + bool COLLECTIVE_META; + bool COLLECTIVE_DATA; + bool SUBFILING; + + // Reader + // DATA_LOADER; + uint32_t BATCH_SIZE; + uint32_t BATCH_SIZE_EVAL; + uint32_t READ_THREADS; + // int COMPUTATION_THREADS; + // unsigned int PREFETCH_SIZE; + bool DO_SHUFFLE; // sample shuffle vs file_shuffle + // unsigned int TRANSFER_SIZE; + float PREPROCESS_TIME; + float PREPROCESS_TIME_STDEV; + bool DROP_LAST; + + // Train + uint32_t EPOCHS; + float COMPUTATION_TIME; + float COMPUTATION_TIME_STDEV; + uint32_t TOTAL_TRAINING_STEPS; + uint32_t TOTAL_TRAINING_STEPS_PER_RANK; + bool SEED_CHANGE_EPOCH; + int RANDOM_SEED; + + // Evaluation + float EVAL_TIME; + float EVAL_TIME_STDEV; + uint32_t EPOCHS_BETWEEN_EVALS; + + // Output + char *TRAIN_DATA_FOLDER; + char *VALID_DATA_FOLDER; + char *RECORDS_DATASET_NAME; + char *LABELS_DATASET_NAME; + char *OUTPUT_DATA_FOLDER; + char *OUTPUT_CSV_NAME; + bool OUTPUT_RANKS_DATA; + + // Internal + uint32_t NUM_TRAIN_BATCHES_PER_RANK; + uint32_t NUM_EVAL_BATCHES_PER_RANK; + uint32_t NUM_OF_ACTUALLY_USED_PROCESSES_TRAIN; + uint32_t NUM_OF_ACTUALLY_USED_PROCESSES_EVAL; +} config_datatype_t; + +extern config_datatype_t config; + +void shuffle(uint32_t *array, size_t n); + +double generate_normal_random(float mean, float stdev); + +void create_directory(const char *folder); + +void delete_directory(const char *dir_path); + +void parse_args(int argc, char *argv[]); + +#endif // SANDBOX_UTILS_H diff --git a/dlio/workers.c b/dlio/workers.c new file mode 100644 index 00000000..5b6ba6a9 --- /dev/null +++ b/dlio/workers.c @@ -0,0 +1,217 @@ +// TODO: handle errors in child processes + +#include +#include +#include +#include +#include + +#include "h5bench_dlio.h" +#include "workers.h" +#include "utils.h" + +int pipe_train_task_fd[2], pipe_train_result_fd[2], pipe_eval_task_fd[2], pipe_eval_result_fd[2]; +int pipe_train_system_fd[2], pipe_eval_system_fd[2]; + +// Initialization of processes that will be used later on in the simulation of data processing +void +init_workers(uint32_t *indices_train, uint32_t *indices_eval) +{ + if ((pipe(pipe_train_system_fd) == -1) || (pipe(pipe_train_task_fd) == -1) || + (pipe(pipe_train_result_fd) == -1)) { + perror("pipe"); + exit(EXIT_FAILURE); + } + + for (uint32_t i = 0; i < config.READ_THREADS; i++) { + pid_t pid = fork(); + if (pid == -1) { + perror("fork"); + exit(EXIT_FAILURE); + } + else if (pid == 0) { + close(pipe_train_task_fd[1]); + close(pipe_train_result_fd[0]); + close(pipe_train_system_fd[1]); + + run_worker(indices_train, pipe_train_task_fd, pipe_train_result_fd, pipe_train_system_fd, true); + + close(pipe_train_task_fd[0]); + close(pipe_train_result_fd[1]); + close(pipe_train_system_fd[0]); + exit(EXIT_SUCCESS); + } + } + + if (config.DO_EVALUATION) { + if ((pipe(pipe_eval_system_fd) == -1) || (pipe(pipe_eval_task_fd) == -1) || + (pipe(pipe_eval_result_fd) == -1)) { + perror("pipe"); + exit(EXIT_FAILURE); + } + + for (uint32_t i = 0; i < config.READ_THREADS; i++) { + pid_t pid = fork(); + if (pid == -1) { + perror("fork"); + exit(EXIT_FAILURE); + } + else if (pid == 0) { + close(pipe_eval_task_fd[1]); + close(pipe_eval_result_fd[0]); + close(pipe_eval_system_fd[1]); + + run_worker(indices_eval, pipe_eval_task_fd, pipe_eval_result_fd, pipe_eval_system_fd, false); + + close(pipe_eval_task_fd[0]); + close(pipe_eval_result_fd[1]); + close(pipe_eval_system_fd[0]); + exit(EXIT_SUCCESS); + } + } + + close(pipe_eval_task_fd[0]); + close(pipe_eval_result_fd[1]); + close(pipe_eval_system_fd[0]); + } + + close(pipe_train_task_fd[0]); + close(pipe_train_result_fd[1]); + close(pipe_train_system_fd[0]); +} + +// Returns the file descriptor opened for reading and used to communicate with training workers +int +get_train_read_fd() +{ + return pipe_train_result_fd[0]; +} + +// Returns the file descriptor opened for reading and used to communicate with evaluation workers +int +get_eval_read_fd() +{ + return pipe_eval_result_fd[0]; +} + +// Returns the file descriptor opened for writing and used to communicate with training workers +int +get_train_write_fd() +{ + return pipe_train_task_fd[1]; +} + +// Returns the file descriptor opened for writing and used to communicate with evaluation workers +int +get_eval_write_fd() +{ + return pipe_eval_task_fd[1]; +} + +// Returns the file descriptor opened for writing and used to manage the training workers +int +get_train_system_fd() +{ + return pipe_train_system_fd[1]; +} + +// Returns the file descriptor opened for writing and used to manage the evaluation workers +int +get_eval_system_fd() +{ + return pipe_eval_system_fd[1]; +} + +// Release all resources used by processes and the processes themselves +void +fin_workers() +{ + close(pipe_train_task_fd[1]); + close(pipe_train_result_fd[0]); + close(pipe_train_system_fd[1]); + + if (config.DO_TRAIN) { + close(pipe_eval_task_fd[1]); + close(pipe_eval_result_fd[0]); + close(pipe_eval_system_fd[1]); + } + + for (uint32_t i = 0; i < config.READ_THREADS; i++) { + wait(NULL); + } + + if (config.DO_EVALUATION) { + for (uint32_t i = 0; i < config.READ_THREADS; i++) { + wait(NULL); + } + } +} + +// Command all workers to shuffle data files +void +force_workers_to_shuffle(int read_fd, int write_fd, int system_fd) +{ + int32_t shuffle_code = -1; + for (uint32_t i = 0; i < config.READ_THREADS; i++) { + write(write_fd, &shuffle_code, sizeof(shuffle_code)); + } + + for (uint32_t i = 0; i < config.READ_THREADS; i++) { + read(read_fd, &shuffle_code, sizeof(shuffle_code)); + } + + for (uint32_t i = 0; i < config.READ_THREADS; i++) { + write(system_fd, &shuffle_code, sizeof(shuffle_code)); + } +} + +// Starting a worker waiting for commands to read data batches +void +run_worker(uint32_t *indices, int pipe_task_fd[2], int pipe_result_fd[2], int pipe_system_fd[2], + bool is_train_worker) +{ + int32_t batch = 0, current_epoch = 0; + while (read(pipe_task_fd[0], &batch, sizeof(batch)) > 0) { + // A new epoch has begun + if (batch == -1) { + if (config.SEED_CHANGE_EPOCH) { + srand(config.RANDOM_SEED * (is_train_worker ? 1 : 2) + current_epoch); + } + if (config.DO_SHUFFLE) { + shuffle(indices, config.NUM_SAMPLES_PER_FILE * + (is_train_worker ? config.NUM_FILES_TRAIN : config.NUM_FILES_EVAL)); + } + current_epoch++; + write(pipe_result_fd[1], &batch, sizeof(batch)); + read(pipe_system_fd[0], &batch, sizeof(batch)); + continue; + } + + uint32_t read_from = batch * (is_train_worker ? config.BATCH_SIZE : config.BATCH_SIZE_EVAL); + uint32_t read_to = (batch + 1) * (is_train_worker ? config.BATCH_SIZE : config.BATCH_SIZE_EVAL); + uint64_t process_metadata_time = 0, process_read_time = 0; + + for (uint32_t i = read_from; i < read_to; i++) { + uint32_t file_num = indices[i] / config.NUM_SAMPLES_PER_FILE + 1; + uint32_t sample_num = indices[i] % config.NUM_SAMPLES_PER_FILE; + char file_path[256]; + snprintf(file_path, sizeof(file_path), "%s/%s/%s_%u_of_%u.h5", config.DATA_FOLDER, + is_train_worker ? config.TRAIN_DATA_FOLDER : config.VALID_DATA_FOLDER, + config.FILE_PREFIX, file_num, + is_train_worker ? config.NUM_FILES_TRAIN : config.NUM_FILES_EVAL); + + uint64_t metadata_time = 0, read_time = 0; + read_sample(file_path, sample_num, &metadata_time, &read_time); + + process_metadata_time += metadata_time; + process_read_time += read_time; + } + + execution_time_t data = { + .metadata_time = process_metadata_time, + .read_time = process_read_time, + }; + + write(pipe_result_fd[1], &data, sizeof(data)); + } +} diff --git a/dlio/workers.h b/dlio/workers.h new file mode 100644 index 00000000..31933616 --- /dev/null +++ b/dlio/workers.h @@ -0,0 +1,33 @@ +#ifndef H5BENCH_WORKERS_H +#define H5BENCH_WORKERS_H + +#include +#include + +typedef struct execution_time { + uint64_t metadata_time; + uint64_t read_time; +} execution_time_t; + +void init_workers(uint32_t *indices_train, uint32_t *indices_eval); + +int get_train_read_fd(); + +int get_eval_read_fd(); + +int get_train_write_fd(); + +int get_eval_write_fd(); + +int get_train_system_fd(); + +int get_eval_system_fd(); + +void fin_workers(); + +void force_workers_to_shuffle(int read_fd, int write_fd, int system_fd); + +void run_worker(uint32_t *indices, int pipe_task_fd[2], int pipe_result_fd[2], int pipe_system_fd[2], + bool is_train_worker); + +#endif // H5BENCH_WORKERS_H diff --git a/docs/source/buildinstructions.rst b/docs/source/buildinstructions.rst index 9cebf08c..a6af0d2c 100644 --- a/docs/source/buildinstructions.rst +++ b/docs/source/buildinstructions.rst @@ -104,7 +104,8 @@ Exerciser ``h5bench_exerciser`` ``-DH5BENCH_EXERCISER=ON`` OpenPMD (write) ``h5bench_openpmd_write`` ``-DH5BENCH_OPENPMD=ON`` OpenPMD (read) ``h5bench_openpmd_read`` ``-DH5BENCH_OPENPMD=ON`` E3SM-IO ``h5bench_e3sm`` ``-DH5BENCH_E3SM=ON`` -MACSio ``h5bench_macsio`` ``-DH5BENCH_MACSIO=ON`` +MACSio ``h5bench_macsio`` ``-DH5BENCH_MACSIO=ON`` +DLIO ``h5bench_dlio`` ``-DH5BENCH_DLIO=ON`` ==================== =========================== =============================== .. warning:: diff --git a/docs/source/dlio.rst b/docs/source/dlio.rst new file mode 100644 index 00000000..afff6a22 --- /dev/null +++ b/docs/source/dlio.rst @@ -0,0 +1,137 @@ +DLIO +==== + +The benchmark is designed to measure the performance of training and evaluation of deep learning models on data stored +as HDF5 files. Based on collected and analysed I/O patterns from `DLIO Benchmark `_, +this benchmark simulates the learning process and evaluation of deep learning models that use PyTorch and Tensorflow +frameworks, while gathering valuable information about system performance. Most importantly, this extension allows users +to test AI workloads without the need to install machine learning libraries, reducing complexity and enhancing the +usability of the benchmark. Another advantage is that from our experiments, our extension ran faster than DLIO Benchmark, +which we suspect was due to the difference in the overhead introduced by the C application in our extension and the +Python application in the original benchmark. While the quicker runtime could be beneficial for faster testing, it also +suggests that the benchmark might not fully capture the complexity of real AI workloads, such as high metadata +operations introduced by the use of Python-based libraries. I/O pattern produced by this extension is based on the +implementation of `DLIO benchmark version 1.1 `_. +Changes in the main DLIO Benchmark configurations after version 1.1 will not be reflected in this h5bench pattern. To +reproduce them, DLIO Benchmark behavior can be studied using various I/O analysis tools. We recommend using +`Log VFD `_. + +Configuration +------------- + +As in the case with other extensions, the following parameters should be specified in the configuration section of the json file to configure the benchmark: + +========================== ===================================================================== ======== ============== +**Parameter** **Description** **Type** **Default** +========================== ===================================================================== ======== ============== +``generate-data`` Enable generation of benchmarking data bool false +``train`` Enable model training simulation bool false +``evaluation`` Enable model evaluation simulation bool false +``record-length`` Record size of a single sample in bytes int 67108864 +``num-files-train`` The number of files used to train the model int 32 +``num-files-eval`` The number of files used to evaluate the model int 8 +``num-samples-per-file`` The number of samples in each file int 4 +``data-folder`` Name of the directory storing the benchmark data string ./data +``file-prefix`` Prefix in the name of files containing training and evaluation data string img +``chunking`` Enable chunking bool false +``chunk-size`` Chunk size int 1024 +``keep-files`` Does not delete data after the benchmark is finished bool false +``compression`` Enable compression bool false +``compression-level`` Compression level from 1 to 9 int 4 +``batch-size`` Training batch size int 7 +``batch-size-eval`` Evaluation batch size int 2 +``shuffle`` Enable samples shuffle bool false +``preprocess-time`` Preprocessing time after reading each sample in seconds float 0.0 +``preprocess-time-stdev`` Standard deviation in preprocessing time in seconds float 0.0 +``epochs`` The number of epochs int 5 +``total-training-steps`` Maximum number of steps per training per epoch int -1 +``computation-time`` Computation time after reading each batch in seconds float 0.323 +``computation-time-stdev`` Standard deviation in computation time in seconds float 0.0 +``random-seed`` Random seed to be used int 42 +``eval-time`` Evaluation time after reading each batch in seconds float 0.323 +``eval-time-stdev`` Standard deviation in evaluation time in seconds float 0.0 +``epochs-between-evals`` The number of epochs between evaluations int 1 +``train-data-folder`` Name of the directory containing the training data string train +``valid-data-folder`` Name of the directory containing the validation data string valid +``records-dataset-name`` Name of the dataset with records string records +``labels-dataset-name`` Name of the dataset with labels string labels +``seed-change-epoch`` Enable seed changes every epoch bool false +``read-threads`` The number of workers used to read the data int 4 +``collective-meta`` Enable collective HDF5 metadata operations bool false +``collective-data`` Enable collective HDF5 data operations bool false +``subfiling`` Enable HDF5 Subfiling Virtual File Driver bool false +``output-csv-name`` Name of the output csv file string output +``output-ranks-data `` Enable statistics output for each rank bool false +========================== ===================================================================== ======== ============== + +It should be noted that for each parameter there is a default value that applies if the parameter has not been specified +in the configuration file. Thus, by default the benchmark will not run because the generate-data, train and evaluation +parameters are false. A sample configuration file can be found in the ``samples/`` directory. + +Understanding the output +------------------------ +The sample output of the benchmark is as follows: + +.. code-block:: + + =================== Performance Results ================== + Total number of ranks: 8 + The number of read threads per rank: 0 + Total training set size: 7.000 GB + Training set size per rank: 896.000 MB + Total training emulated compute time: 3.229 s + Training metadata time: 2.808 s + Training raw read time: 30.905 s + Training average raw read rate: 145.141 MB/s + Observed training completion time: 37.432 s + Observed average training rate: 131.044 MB/s + Training average throughput: 1.871 samples/s + Training throughput standard deviation: 0.037 samples/s + Training average IO: 119.729 MB/s + Training IO standard deviation: 2.379 MB/s + Total evaluation set size: 7.000 GB + Evaluation set size per rank: 896.000 MB + Total evaluation emulated compute time: 3.206 s + Evaluation metadata time: 2.805 s + Evaluation raw read time: 31.699 s + Evaluation average raw read rate: 141.906 MB/s + Observed evaluation completion time: 38.424 s + Observed average evaluation rate: 127.595 MB/s + Evaluation average throughput avg: 1.826 samples/s + Evaluation throughput standard deviation: 0.090 samples/s + Evaluation average IO: 116.883 MB/s + Evaluation IO standard deviation: 5.735 MB/s + =========================================================== + +Let's take a closer look at it. First, information about the number of MPI ranks and processes per MPI rank used in the +simulation is output. Then, the same values are used to describe the training and evaluation performance, so for the +sake of reducing redundancy, let us consider only the first half of the results concerning the training process. Total +training set size is calculated as the size of all HDF5 files used for training. Accordingly, the training set size per +rank gives an idea of how much of the load is taken over by one MPI rank. Total training emulated compute time contains +information about the total time spent on compute emulation for all epochs in total, as well as training metadata time +and training raw read time, about which, however, it should be noted that they are not interleaved and measure the time +of execution of ``H5Fopen``, ``H5Dget_space``, ``H5Screate_simple``, ``H5Sclose`` and ``H5Dread`` commands respectively. +Training average raw read rate is calculated as training set size per rank divided by training raw read time. Observed +training completion time includes all the time spent on the training process, among other things including resource +allocation and computation simulation. Observed average training rate is equal to training set size per rank divided by +the difference of observed training completion time and total training emulated compute time, thus showing the data +reading rate without taking into account emulation costs. Training average throughput and training throughput standard +deviation give an indication of the number of samples from the training dataset processed in one second. Training +average IO and Training IO standard deviation translate these values into bytes/second by multiplying by the size of +one sample. + +Future work +----------- + +There are plans to add more configuration options for the extension in the future to increase its flexibility: +* Add settings for Subfiling VFD. Currently, the default settings are used. +* Add more features from `DLIO Benchmark `_ such as resizable records. +* Analyze and add support for other ml frameworks and data loaders. For example, DALI. +* Add support for prefetching. +* Expand the ability to randomly shuffle samples. At the moment, it is not possible to shuffle only samples in each file +without changing the order of the files for training. +* Add more compression filters and thus support different compression algorithms for HDF5 data. +* Add support for drop_last customization. Currently, by default, all batches left after MPI ranks distribution are not processed. +* Replace the use of ``fork()`` with ``MPI_Comm_spawn()`` when creating new processes, as using ``fork()`` with MPI may be unsafe +* Test support for the Cache VOL connector. +* Add support for checkpointing by saving the model to a hdf5 file. diff --git a/docs/source/index.rst b/docs/source/index.rst index bb3d56a0..ca8e47df 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -35,7 +35,8 @@ Exerciser ``h5bench_exerciser`` |:white_check_mark:| |:white_la OpenPMD (write) ``h5bench_openpmd_write`` |:white_check_mark:| |:white_large_square:| |:white_large_square:| |:white_large_square:| OpenPMD (read) ``h5bench_openpmd_read`` |:white_check_mark:| |:white_large_square:| |:white_large_square:| |:white_large_square:| E3SM-IO ``h5bench_e3sm`` |:white_check_mark:| |:white_large_square:| |:white_large_square:| |:white_check_mark:| -MACSio ``h5bench_macsio`` |:white_check_mark:| |:white_large_square:| |:white_large_square:| |:white_check_mark:| +MACSio ``h5bench_macsio`` |:white_check_mark:| |:white_large_square:| |:white_large_square:| |:white_check_mark:| +DLIO ``h5bench_dlio`` |:white_check_mark:| |:white_large_square:| |:white_large_square:| |:white_large_square:| ==================== =========================== ==================== ======================== ======================== ======================== .. toctree:: @@ -56,6 +57,7 @@ MACSio ``h5bench_macsio`` |:white_check_mark:| |:white_la openpmd e3sm macsio + dlio .. toctree:: :maxdepth: 2 diff --git a/samples/sync-dlio.json b/samples/sync-dlio.json new file mode 100644 index 00000000..ec25d3b7 --- /dev/null +++ b/samples/sync-dlio.json @@ -0,0 +1,75 @@ +{ + "mpi": { + "command": "mpirun", + "ranks": "4", + "configuration": "--allow-run-as-root --oversubscribe -np 4" + }, + "vol": { + + }, + "file-system": { + + }, + "directory": "storage", + "benchmarks": [ + { + "benchmark": "dlio", + "configuration": { + "generate-data": "true", + "chunking": "false", + "keep-files": "true", + "compression": "false", + "record-length": "1048576", + "num-files-train": "8", + "num-files-eval": "2", + "num-samples-per-file": "4", + "data-folder": "data", + "file-prefix": "img", + "random-seed": "42", + "train-data-folder": "train", + "valid-data-folder": "valid", + "records-dataset-name": "records", + "labels-dataset-name": "labels", + "output-csv-name": "output", + "output-ranks-data": "true" + } + }, + { + "benchmark": "dlio", + "configuration": { + "train": "true", + "evaluation": "true", + "keep-files": "true", + "shuffle": "true", + "seed-change-epoch": "true", + "record-length": "1048576", + "num-files-train": "8", + "num-files-eval": "2", + "num-samples-per-file": "4", + "data-folder": "data", + "file-prefix": "img", + "batch-size": "2", + "batch-size-eval": "1", + "read-threads": "1", + "preprocess-time": "0.0", + "preprocess-time-stdev": "0.0", + "epochs": "1", + "computation-time": "0.123", + "computation-time-stdev": "0.0", + "random-seed": "42", + "eval-time": "0.123", + "eval-time-stdev": "0.0", + "epochs-between-evals": "1", + "train-data-folder": "train", + "valid-data-folder": "valid", + "records-dataset-name": "records", + "labels-dataset-name": "labels", + "collective-meta": "true", + "collective-data": "true", + "subfiling": "false", + "output-csv-name": "output", + "output-ranks-data": "true" + } + } + ] +} \ No newline at end of file diff --git a/src/h5bench.py b/src/h5bench.py index b50b7850..66a65278 100755 --- a/src/h5bench.py +++ b/src/h5bench.py @@ -34,6 +34,7 @@ class H5bench: H5BENCH_OPENPMD_READ = 'h5bench_openpmd_read' H5BENCH_E3SM = 'h5bench_e3sm' H5BENCH_MACSIO = 'h5bench_macsio' + H5BENCH_DLIO = 'h5bench_dlio' def __init__(self, setup, prefix=None, debug=None, abort=None, validate=None, filter=None): """Initialize the suite.""" @@ -245,6 +246,8 @@ def run(self): self.run_e3sm(id, benchmark) elif name == 'macsio': self.run_macsio(id, benchmark) + elif name == 'dlio': + self.run_dlio(id, benchmark) else: self.logger.critical('{} - Unsupported benchmark/kernel') @@ -952,6 +955,93 @@ def run_macsio(self, id, setup): except Exception as e: self.logger.error('Unable to run the benchmark: %s', e) + def run_dlio(self, id, setup): + """Run the DLIO benchmark.""" + if not self.is_available(self.H5BENCH_DLIO): + self.logger.critical('{} is not available'.format(self.H5BENCH_DLIO)) + + sys.exit(os.EX_UNAVAILABLE) + + try: + start = time.time() + + configuration = setup['configuration'] + + parameters = [] + + parameters_binary = [ + 'generate-data', + 'train', + 'evaluation', + 'chunking', + 'keep-files', + 'compression', + 'shuffle', + 'seed-change-epoch', + 'collective-meta', + 'collective-data', + 'subfiling', + 'output-ranks-data', + ] + + # Create the configuration parameter list + for key in configuration: + if key in parameters_binary: + if configuration[key].lower() == 'true': + parameters.append('--{} '.format(key)) + + # Make sure datasets are generated in the temporary path + elif key == 'data-folder': + parameters.append('--{} {} '.format(key, '{}/{}'.format(self.directory, configuration[key]))) + else: + parameters.append('--{} {} '.format(key, configuration[key])) + + parameters.append('--output-data-folder {} '.format('{}/{}'.format(self.directory, id))) + + if self.prefix: + benchmark_path = self.prefix + '/' + self.H5BENCH_DLIO + else: + if os.path.isfile(h5bench_configuration.__install__ + '/' + self.H5BENCH_DLIO): + benchmark_path = h5bench_configuration.__install__ + '/' + self.H5BENCH_DLIO + else: + benchmark_path = self.H5BENCH_DLIO + + command = '{} {} {}'.format( + self.mpi, + benchmark_path, + ' '.join(parameters) + ) + + self.logger.info(command) + + # Make sure the command line is in the correct format + arguments = shlex.split(command) + + stdout_file_name = '{}/{}/stdout'.format(self.directory, id) + stderr_file_name = '{}/{}/stderr'.format(self.directory, id) + + with open(stdout_file_name, mode='w') as stdout_file, open(stderr_file_name, mode='w') as stderr_file: + s = subprocess.Popen(arguments, stdout=stdout_file, stderr=stderr_file, env=self.vol_environment) + sOutput, sError = s.communicate() + + if s.returncode == 0 and not self.check_for_hdf5_error(stderr_file_name): + self.logger.info('SUCCESS (all output files are located at %s/%s)', self.directory, id) + else: + self.logger.error('Return: %s (check %s for detailed log)', s.returncode, stderr_file_name) + + if self.abort: + self.logger.critical('h5bench execution aborted upon first error') + + sys.exit(os.EX_SOFTWARE) + + end = time.time() + + self.logger.info('Runtime: {:.7f} seconds (elapsed time, includes allocation wait time)'.format(end - start)) + except Exception as e: + self.logger.error('Unable to run the benchmark: %s', e) + + sys.exit(os.EX_SOFTWARE) + def main(): PARSER = argparse.ArgumentParser( diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index acb5c416..4d8bb4f3 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -52,4 +52,12 @@ if(Python3_Interpreter_FOUND) WORKING_DIRECTORY ${CMAKE_BINARY_DIR} ) endif() + + if(H5BENCH_DLIO) + add_test( + NAME "h5bench-sync-dlio" + COMMAND Python3::Interpreter -m pytest --verbose --rootdir ${CMAKE_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/test_sync_dlio.py + WORKING_DIRECTORY ${CMAKE_BINARY_DIR} + ) + endif() endif() \ No newline at end of file diff --git a/tests/test_sync_dlio.py b/tests/test_sync_dlio.py new file mode 100644 index 00000000..b3f6f346 --- /dev/null +++ b/tests/test_sync_dlio.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 + +import os +import glob +import pytest + +from src import h5bench + +DEBUG = True +ABORT = True +VALIDATE = True + +BINARY = 'h5bench_dlio' + +samples = glob.glob('sync-dlio*.json') + +@pytest.mark.parametrize('configuration', samples) +@pytest.mark.skipif( + os.path.isfile(BINARY) == False, + reason="DLIO is disabled" +) +def test_benchmark(configuration): + assert os.path.isfile(configuration) is True + + benchmark = h5bench.H5bench( + configuration, + None, + DEBUG, + ABORT, + VALIDATE + ) + + benchmark.run() diff --git a/workflows/h5bench-hdf5-1.14.0.yml b/workflows/h5bench-hdf5-1.14.0.yml index 4d26a71c..1d15a38a 100644 --- a/workflows/h5bench-hdf5-1.14.0.yml +++ b/workflows/h5bench-hdf5-1.14.0.yml @@ -243,6 +243,11 @@ jobs: cd build-sync ./h5bench --debug --abort-on-failure ../samples/sync-e3sm.json + - name: Test h5bench SYNC dlio + run: | + cd build-sync + ./h5bench --debug --abort-on-failure ../samples/sync-dlio.json + - name: Test h5bench ASYNC write/read run: | current="$PWD"