Skip to content

Commit

Permalink
use monitored barrier
Browse files Browse the repository at this point in the history
  • Loading branch information
IlyasMoutawwakil committed Feb 19, 2024
1 parent 8e0ad46 commit ff729ba
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 16 deletions.
8 changes: 8 additions & 0 deletions optimum_benchmark/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from .launchers.base import Launcher
from .backends.base import Backend

import pandas as pd
from json import dump
from flatten_dict import flatten
from hydra.utils import get_class
Expand Down Expand Up @@ -64,6 +65,13 @@ def to_json(self, path: str, flat: bool = False) -> None:
with open(path, "w") as f:
dump(self.to_dict(), f, indent=4)

def to_dataframe(self) -> pd.DataFrame:
flat_report_dict = self.to_flat_dict()
return pd.DataFrame.from_dict(flat_report_dict, orient="index")

def to_csv(self, path: str) -> None:
self.to_dataframe().to_csv(path, index=False)

def save_pretrained(
self,
save_directory: Union[str, os.PathLike],
Expand Down
7 changes: 5 additions & 2 deletions optimum_benchmark/launchers/process/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def launch(self, worker: Callable, *worker_args) -> BenchmarkReport:
while not process_context.join():
pass

# restore the original logging configuration
setup_logging(log_level)

try:
report: BenchmarkReport = queue.get()
except EOFError:
Expand All @@ -50,13 +53,13 @@ def launch(self, worker: Callable, *worker_args) -> BenchmarkReport:
return report


def entrypoint(_, worker, queue, lock, log_level, *worker_args):
def entrypoint(i, worker, queue, lock, log_level, *worker_args):
"""
This a pickalable function that correctly sets up the logging configuration for the worker process,
and puts the output of the worker function into a lock-protected queue.
"""

setup_logging(log_level)
setup_logging(log_level, prefix=f"PROC-{i}")

worker_output = worker(*worker_args)

Expand Down
4 changes: 3 additions & 1 deletion optimum_benchmark/launchers/torchrun/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ def launch(self, worker: Callable, *worker_args) -> Dict[str, Any]:
entrypoint=entrypoint, args=(worker, queue, lock, log_level, *worker_args), config=launch_config
)

# restore the original logging configuration
setup_logging(log_level)

outputs: List[BenchmarkReport] = []
while not queue.empty():
outputs.append(queue.get())
Expand All @@ -71,7 +74,6 @@ def launch(self, worker: Callable, *worker_args) -> Dict[str, Any]:
else:
raise ValueError("No benchmark report was returned by the workers")

setup_logging(level=log_level)
report.log()

return report
Expand Down
4 changes: 2 additions & 2 deletions optimum_benchmark/trackers/energy.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,14 @@ def track(self, interval=1, file_prefix="method"):
)

if self.distributed:
torch.distributed.barrier()
torch.distributed.monitored_barrier()

self.emission_tracker.start()
yield
self.emission_tracker.stop()

if self.distributed:
torch.distributed.barrier()
torch.distributed.monitored_barrier()

self.cpu_energy = self.emission_tracker._total_cpu_energy.kWh
self.gpu_energy = self.emission_tracker._total_gpu_energy.kWh
Expand Down
4 changes: 2 additions & 2 deletions optimum_benchmark/trackers/latency.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,15 @@ def reset(self):
@contextmanager
def track(self):
if self.distributed:
torch.distributed.barrier()
torch.distributed.monitored_barrier()

if self.backend == "pytorch" and self.device == "cuda":
yield from self._pytorch_cuda_latency()
else:
yield from self._cpu_latency()

if self.distributed:
torch.distributed.barrier()
torch.distributed.monitored_barrier()

def _pytorch_cuda_latency(self):
start = torch.cuda.Event(enable_timing=True)
Expand Down
4 changes: 2 additions & 2 deletions optimum_benchmark/trackers/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def reset(self):
@contextmanager
def track(self):
if self.distributed:
torch.distributed.barrier()
torch.distributed.monitored_barrier()

if self.device == "cuda" and self.backend == "pytorch":
yield from self._cuda_pytorch_memory()
Expand All @@ -113,7 +113,7 @@ def track(self):
yield from self._cpu_memory()

if self.distributed:
torch.distributed.barrier()
torch.distributed.monitored_barrier()

def _cuda_pytorch_memory(self):
torch.cuda.empty_cache()
Expand Down
10 changes: 3 additions & 7 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,8 @@ def test_api_dataset_generator(library, task, model):
@pytest.mark.parametrize("launcher_config", LAUNCHER_CONFIGS)
@pytest.mark.parametrize("device", DEVICES)
def test_api_launch(launcher_config, device):
device_ids = None

if device == "cuda":
device_ids = ",".join(str(i) for i in range(torch.cuda.device_count()))

# only inference cuz training is slow
benchmark_config = InferenceConfig(latency=True, memory=True)
# only pytorch backend cuz default
device_ids = ",".join(str(i) for i in range(torch.cuda.device_count())) if device == "cuda" else None
backend_config = PyTorchConfig(model="bert-base-uncased", device_ids=device_ids, no_weights=True, device=device)
experiment_config = ExperimentConfig(
experiment_name="api-experiment", benchmark=benchmark_config, launcher=launcher_config, backend=backend_config
Expand All @@ -164,6 +158,8 @@ def test_api_launch(launcher_config, device):
with TemporaryDirectory() as tempdir:
experiment_config.to_dict()
experiment_config.to_flat_dict()
experiment_config.to_dataframe()
experiment_config.to_csv(f"{tempdir}/experiment_config.csv")
experiment_config.to_json(f"{tempdir}/experiment_config.json")

benchmark_report.to_dict()
Expand Down

0 comments on commit ff729ba

Please sign in to comment.