Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zwu #88

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft

Zwu #88

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build_dlio2.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

docker build -t dlio .
34 changes: 34 additions & 0 deletions configs/workload/dlrm.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
model: dlrm

framework: pytorch

workflow:
generate_data: False
train: True
do_eval: True

dataset:
data_folder: data/dlrm
format: bin
num_files_train: 1
num_files_eval: 1
num_samples_per_file: 4195198976
record_length: 327680
keep_files: True
eval_num_samples_per_file: 91681240

reader:
data_loader: terabyte
batch_size: 2048
batch_size_eval: 16384
sample_shuffle: random

train:
epochs: 1
computation_time: 0.064296
total_training_steps: 32768
total_eval_steps: 2048

evaluation:
eval_time: 0.0843
steps_between_evals: 16384
11 changes: 2 additions & 9 deletions configs/workload/unet3d.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,26 @@ framework: pytorch
workflow:
generate_data: False
train: True
evaluation: True
checkpoint: True

dataset:
data_folder: ./data/unet3d/
format: npz
num_files_train: 168
num_files_eval: 42
num_samples_per_file: 1
record_length: 234560851
record_length_stdev: 109346892
keep_files: True

reader:
data_loader: pytorch
batch_size: 2
batch_size_eval: 1
batch_size: 4
read_threads: 4

train:
epochs: 10
computation_time: 0.753
computation_time: 1.3604

evaluation:
eval_time: 5.8
epochs_between_evals: 2

checkpoint:
checkpoint_after_epoch: 5
epochs_between_checkpoints: 2
Expand Down
3 changes: 3 additions & 0 deletions exec.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash
mpirun -np 8 python3 src/dlio_benchmark.py workload=dlrm
cp -r /workspace/dlio/hydra_log/dlrm/* /workspace/dlio/save_spot
3 changes: 3 additions & 0 deletions launch_dlio.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

docker run -t dlio python ./src/dlio_benchmark.py ++workload.workflow.generate_data=True
19 changes: 19 additions & 0 deletions run_dlio2.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash

container_name=${1:-dlrm_dlio2}

# Remove existing container if a previous run was interrupted
if [ "$(docker ps -a | grep $container_name)" ]
then
docker rm $container_name
fi

# docker run -it --rm --name=$container_name --gpus all -v /raid/data/dlrm_dlio2/dlio2:/workspace/dlio/data/dlrm dlio:latest /bin/bash exec.sh
sudo docker run -it \
--rm \
--name=$container_name \
--gpus all \
-v /raid/data/dlrm_dlio2/dlio2:/workspace/dlio/data/dlrm \
-v /raid/data/dlrm_dlio2/dlio2_log:/workspace/dlio/save_spot \
dlio:latest \
/bin/bash exec.sh
2 changes: 2 additions & 0 deletions src/common/enumerations.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class FormatType(Enum):
HDF5_OPT = 'hdf5_opt'
JPEG = 'jpeg'
PNG = 'png'
BIN = 'bin'

def __str__(self):
return self.value
Expand All @@ -100,6 +101,7 @@ class DataLoaderType(Enum):
"""
TENSORFLOW='tensorflow'
PYTORCH='pytorch'
TERABYTE='terabyte'
NONE='none'

def __str__(self):
Expand Down
83 changes: 83 additions & 0 deletions src/data_generator/bin_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""
The binary file generator designed for simulating DLRM in DLIO
"""

from src.common.enumerations import Compression
from src.data_generator.data_generator import DataGenerator

import logging
import numpy as np
from numpy import random
import math
import os

from src.utils.utility import progress
from shutil import copyfile

"""
Generator for creating data in BIN format.
"""

class BINGenerator(DataGenerator):
def __init__(self):
super().__init__()

def generate(self):
"""
Generate binary data for training and testing.
"""
super().generate()

for i in range(self.my_rank, int(self.total_files_to_generate), self.comm_size):
progress(i+1, self.total_files_to_generate, "Generating Binary Data")
out_path_spec = self.storage.get_uri(self._file_list[i])
# File size will be different depending on training or validation file
if i < self.num_files_train:
# Generating Training files
segment_size = 91681240*5
num_instance = self.num_samples #4195198976 for dlrm training
parts = math.ceil(num_instance / segment_size)
for k in range(0, parts):
num_written = segment_size if k < parts-1 else num_instance - k*segment_size
X_int = np.random.randint(2557264, size = (num_written, 13))
X_cat = np.random.randint(8831335, size = (num_written, 26))
y = np.random.randint(2, size=num_written)
np_data = np.concatenate([y.reshape(-1, 1), X_int, X_cat], axis=1)
np_data = np_data.astype(np.int32)
if self.compression != Compression.ZIP:
with open(out_path_spec, 'ab') as output_file:
output_file.write(np_data.tobytes())
output_file.flush()
os.fsync(output_file.fileno())
else:
# Generating Evaluation files

#### Old implementation that flushes file written at the end
#
# num_instance = self.eval_num_samples_per_file # estimated as 6548660*14
# X_int = np.random.randint(2557264, size = (num_instance, 13))
# X_cat = np.random.randint(8831335, size = (num_instance, 26))
# y = np.random.randint(2, size=num_instance)
# np_data = np.concatenate([y.reshape(-1, 1), X_int, X_cat], axis=1)
# np_data = np_data.astype(np.int32)
# if self.compression != Compression.ZIP:
# with open(out_path_spec, 'wb') as output_file:
# output_file.write(np_data.tobytes())

segment_size = 91681240*5
num_instance = self.eval_num_samples_per_file #4195198976 for dlrm training
parts = math.ceil(num_instance / segment_size)
for k in range(0, parts):
num_written = segment_size if k < parts-1 else num_instance - k*segment_size
X_int = np.random.randint(2557264, size = (num_written, 13))
X_cat = np.random.randint(8831335, size = (num_written, 26))
y = np.random.randint(2, size=num_written)
np_data = np.concatenate([y.reshape(-1, 1), X_int, X_cat], axis=1)
np_data = np_data.astype(np.int32)
if self.compression != Compression.ZIP:
with open(out_path_spec, 'ab') as output_file:
output_file.write(np_data.tobytes())
output_file.flush()
os.fsync(output_file.fileno())


2 changes: 2 additions & 0 deletions src/data_generator/data_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def __init__(self):
self.storage = StorageFactory().get_storage(self._args.storage_type, self._args.storage_root,
self._args.framework)

self.eval_num_samples_per_file = self._args.eval_num_samples_per_file

@abstractmethod
def generate(self):
if self.my_rank == 0:
Expand Down
3 changes: 3 additions & 0 deletions src/data_generator/generator_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from src.data_generator.npz_generator import NPZGenerator
from src.data_generator.jpeg_generator import JPEGGenerator
from src.data_generator.png_generator import PNGGenerator
from src.data_generator.bin_generator import BINGenerator



Expand All @@ -44,5 +45,7 @@ def get_generator(type):
return JPEGGenerator()
elif type == FormatType.PNG:
return PNGGenerator()
elif type == FormatType.BIN:
return BINGenerator()
else:
raise Exception(str(ErrorCodes.EC1001))
47 changes: 40 additions & 7 deletions src/dlio_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def __init__(self, cfg):
self.num_files_train = self.args.num_files_train
self.num_samples = self.args.num_samples_per_file
self.total_training_steps = self.args.total_training_steps
self.total_eval_steps = self.args.total_eval_steps

self.epochs = self.args.epochs
self.batch_size = self.args.batch_size
Expand Down Expand Up @@ -142,6 +143,9 @@ def __init__(self, cfg):
self.eval_after_epoch = self.args.eval_after_epoch
self.epochs_between_evals = self.args.epochs_between_evals

self.steps_between_evals = self.args.steps_between_evals
self.eval_num_samples = self.args.eval_num_samples_per_file

# Hold various lists/dicts for statistics
self.time_to_load_train_batch = []
self.time_to_process_train_batch = []
Expand Down Expand Up @@ -193,8 +197,8 @@ def _eval(self, epoch):
step = 1
total = math.floor(self.num_samples * self.num_files_eval / self.batch_size_eval / self.comm_size)
t0 = time()
reader = self.framework.get_reader(DatasetType.VALID)
for batch in reader.next():

for batch in self.framework.get_reader(DatasetType.VALID).next():
self.stats.eval_batch_loaded(epoch, step, t0)

if self.eval_time > 0:
Expand All @@ -207,18 +211,24 @@ def _eval(self, epoch):
self.stats.eval_batch_processed(epoch, step, t0)

step += 1
if step > total:

# if step >= self.total_eval_steps:
# return

if step > total or step >= self.total_eval_steps:
return step - 1

self.framework.barrier()
t0 = time()

return step - 1

def _train(self, epoch):
"""
Training loop for reading the dataset and performing training computations.
:return: returns total steps.
"""
block = 1 # A continuous period of training steps, ended by checkpointing
block = 1 # A continuous period of training steps, ended by checkpointing (and evluation???)
block_step = overall_step = 1 # Steps are taken within blocks
max_steps = math.floor(self.num_samples * self.num_files_train / self.batch_size / self.comm_size)

Expand All @@ -245,6 +255,31 @@ def _train(self, epoch):

self.stats.batch_processed(epoch, overall_step, block, t0)

# Perform evaluation during epochs if required
# Assume that evaluation happens on all GPU
if overall_step > 0 and overall_step % self.steps_between_evals == 0:
# Before starting the evaluation, terminating the current block
self.stats.end_block(epoch, block, block_step)

# Initialize the eval data loader & perform evaluation
self.stats.start_eval(epoch)
self.framework.get_reader(DatasetType.VALID).read(epoch)
self.framework.barrier()
self._eval(epoch)
self.stats.end_eval(epoch)
self.framework.barrier()
self.framework.get_reader(DatasetType.VALID).finalize()

##### checkpoint after evaluation
self.stats.start_ckpt(epoch, block, overall_step)
self.framework.checkpoint(epoch, overall_step)
self.stats.end_ckpt(epoch, block)
self.framework.barrier()
##### checkpoint end

# Start recording the next block
self.stats.start_block(epoch, block)


if self.do_checkpoint and (self.steps_between_checkpoints>=0) and overall_step == self.next_checkpoint_step:
self.stats.end_block(epoch, block, block_step)
Expand All @@ -266,7 +301,7 @@ def _train(self, epoch):
if (block_step!=1 and self.do_checkpoint) or (not self.do_checkpoint):
self.stats.end_block(epoch, block, block_step-1)
break

overall_step += 1
t0 = time()

Expand Down Expand Up @@ -314,8 +349,6 @@ def run(self):
self.stats.end_epoch(epoch, steps)
logging.debug(f"{utcnow()} Rank {self.my_rank} returned after {steps} steps.")



self.framework.barrier()
self.framework.get_reader(DatasetType.TRAIN).finalize()

Expand Down
4 changes: 4 additions & 0 deletions src/reader/reader_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ def get_reader(type, data_loader, dataset_type):
from src.reader.torch_data_loader_reader import TorchDataLoaderReader
return TorchDataLoaderReader(dataset_type)
# Implement other data loader here
# Terabyte Dataloader added for supporting DLRM simulation
elif data_loader == DataLoaderType.TERABYTE:
from src.reader.terabyte_data_loader_reader import TeraBinLoaderReader
return TeraBinLoaderReader(dataset_type)
else:
print("Data Loader %s is not implemented" %data_loader)
raise Exception(str(ErrorCodes.EC1004))
Loading