Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Write to mlperf run_history bigquery table for mlperf runs #120

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions xlml/apis/metric_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
class DatasetOption(enum.Enum):
BENCHMARK_DATASET = "benchmark_dataset"
XLML_DATASET = "xlml_dataset"
MLPERF_DATASET = "mlperf_dataset"


class FormatType(enum.Enum):
Expand Down
10 changes: 10 additions & 0 deletions xlml/apis/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ def name(self):
"""Name of this TPU type in the Cloud TPU API (e.g. 'v4-8')."""
return f'v{self.version.value}-{self.cores}'

@property
def num_chips(self):
"""Number of physical TPU chips"""
return (
self.cores
if self.version.value
and (self.version.value[-1] == 'e' or 'litepod' in self.version.value)
else self.cores / 2
)


@attrs.define
class Gpu(Accelerator):
Expand Down
36 changes: 36 additions & 0 deletions xlml/utils/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
BENCHMARK_BQ_JOB_TABLE_NAME = "job_history"
BENCHMARK_BQ_METRIC_TABLE_NAME = "metric_history"
BENCHMARK_BQ_METADATA_TABLE_NAME = "metadata_history"
BENCHMARK_BQ_RUN_TABLE_NAME = "run_history"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to name it more specifically to mlperf? i.e. mlperf_history or mlperf_result?



@dataclasses.dataclass
Expand All @@ -54,11 +55,36 @@ class MetadataHistoryRow:
metadata_value: str


@dataclasses.dataclass
class RunHistoryRow:
uuid: str
description: str
platform: str
date: datetime.datetime
base_cl: str
precision: str
model: str
multislice_topology: str
num_params: int
global_batch_size: int
per_device_batch_size: float
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per core or device? I feel core is widely used.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

num_chips: int
step_time: float
throughput: float
per_device_tflops_per_sec: float
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a little bit change of this name, so would like to double check, i.e. indicate teraflop?

ici_mesh_shape: str
dcn_mesh_shape: str
xprof: str
mfu: float
xla_flags: str


@dataclasses.dataclass
class TestRun:
job_history: JobHistoryRow
metric_history: Iterable[MetricHistoryRow]
metadata_history: Iterable[MetadataHistoryRow]
run_history: RunHistoryRow = None


class JobStatus(enum.Enum):
Expand Down Expand Up @@ -105,6 +131,10 @@ def metric_history_table_id(self):
def metadata_history_table_id(self):
return ".".join((self.project, self.database, BENCHMARK_BQ_METADATA_TABLE_NAME))

@property
def run_history_table_id(self):
return ".".join((self.project, self.database, BENCHMARK_BQ_RUN_TABLE_NAME))

def is_valid_metric(self, value: float):
"""Check if float metric is valid for BigQuery table."""
invalid_values = [math.inf, -math.inf, math.nan]
Expand All @@ -120,6 +150,11 @@ def insert(self, test_runs: Iterable[TestRun]) -> None:
# job hisotry rows
job_history_rows = [dataclasses.astuple(run.job_history)]

# run history rows
run_history_rows = (
[dataclasses.astuple(run.run_history)] if run.run_history else None
)

# metric hisotry rows
metric_history_rows = []
for each in run.metric_history:
Expand All @@ -137,6 +172,7 @@ def insert(self, test_runs: Iterable[TestRun]) -> None:
(self.job_history_table_id, job_history_rows),
(self.metric_history_table_id, metric_history_rows),
(self.metadata_history_table_id, metadata_history_rows),
(self.run_history_table_id, run_history_rows),
]:
if not rows:
continue
Expand Down
113 changes: 106 additions & 7 deletions xlml/utils/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import hashlib
import os
import re
from typing import Dict, Iterable, List, Optional
from typing import Dict, Iterable, List, Optional, Any
import uuid
from absl import logging
import airflow
Expand Down Expand Up @@ -214,20 +214,18 @@ def process_json_lines(


def process_tensorboard_summary(
base_id: str,
summary_config: metric_config.SummaryConfig,
) -> (List[List[bigquery.MetricHistoryRow]], List[List[bigquery.MetadataHistoryRow]],):
) -> (Dict[str, Any], Dict[str, Any],):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dict[str, float] & Dict[str, str]? Same comment for the func below

"""Process metrics and dimensions from TensorBoard file.

Args:
base_id: The unique ID for this test job.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove base_id in comment

summary_config: The configs for TensorBoard summary.

Returns:
A list of MetricHistoryRow for a test run, and
a list of MetadataHistoryRow ofr a test run in a test job.
A Dict of aggregated metrics and a Dict of metadata parsed
from the given Tensorboard file.
"""
uuid = generate_row_uuid(base_id, 0)

if isinstance(summary_config.file_location, airflow.XComArg):
file_location = summary_config.file_location.resolve(get_current_context())
Expand All @@ -249,6 +247,26 @@ def process_tensorboard_summary(
aggregated_metrics[key] = aggregate_metrics(value, aggregation_strategy)
print("aggregated_metrics", aggregated_metrics)

return aggregated_metrics, metadata


def get_metric_history_and_metadata_history_rows(
base_id: str,
aggregated_metrics: Dict[str, Any],
metadata: Dict[str, Any],
) -> (List[List[bigquery.MetricHistoryRow]], List[List[bigquery.MetadataHistoryRow]],):
"""Create metric_history and metadata_history rows to insert into BigQuery

Args:
base_id: The unique ID for this test job.
aggregated_metrics: Dict of aggregated metrics
metadata: Dict of metadata

Returns:
A list of MetricHistoryRow for a test run, and
a list of MetadataHistoryRow ofr a test run in a test job.
"""
uuid = generate_row_uuid(base_id, 0)
metric_history_rows = []
metadata_history_rows = []

Expand All @@ -267,6 +285,73 @@ def process_tensorboard_summary(
return [metric_history_rows], [metadata_history_rows]


def get_run_history_rows(
base_id: str,
aggregated_metrics: Dict[str, Any],
metadata: Dict[str, Any],
task_test_config: test_config.TestConfig[test_config.Tpu],
) -> List[List[bigquery.RunHistoryRow]]:
"""Create run_history rows to insert into BigQuery

Args:
base_id: The unique ID for this test job.
aggregated_metrics: Dict of aggregated metrics
metadata: Dict of metadata
task_test_config: Test config

Returns:
A list of RunHistoryRow for a test run in a test job.
"""
uuid = generate_row_uuid(base_id, 0)
dcn_data_parallelism = metadata["dcn_data_parallelism/text_summary"]
dcn_fsdp_parallelism = metadata["dcn_fsdp_parallelism/text_summary"]
dcn_sequence_parallelism = metadata["dcn_sequence_parallelism/text_summary"]
dcn_tensor_parallelism = metadata["dcn_tensor_parallelism/text_summary"]
dcn_autoregressive_parallelism = metadata[
"dcn_autoregressive_parallelism/text_summary"
]
ici_data_parallelism = metadata["ici_data_parallelism/text_summary"]
ici_fsdp_parallelism = metadata["ici_fsdp_parallelism/text_summary"]
ici_sequence_parallelism = metadata["ici_sequence_parallelism/text_summary"]
ici_tensor_parallelism = metadata["ici_tensor_parallelism/text_summary"]
ici_autoregressive_parallelism = metadata[
"ici_autoregressive_parallelism/text_summary"
]
throughput = (
int(metadata["global_batch_size_to_train_on/text_summary"])
* int(metadata["max_target_length/text_summary"])
) / aggregated_metrics["perf/step_time_seconds"]
precision = (
"bfloat16"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we always has either bf16 or int8? Or any other quantization is available?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now only bf16 and int8 are available

if not metadata["quantization/text_summary"]
else metadata["quantization/text_summary"]
)

run_history_row = bigquery.RunHistoryRow(
uuid=uuid,
description=task_test_config.test_name,
platform="Cloud",
date=datetime.datetime.now(),
base_cl="",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the plan for this metadata?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure yet, I think I will need to add a change in MaxText to get this value into the tensorboard file. But for now will leave it blank.

precision=precision,
model=metadata["model_name/text_summary"],
multislice_topology=f"{task_test_config.num_slices}x{task_test_config.accelerator.name}",
num_params=metadata["num_model_parameters/text_summary"],
global_batch_size=metadata["global_batch_size_to_train_on/text_summary"],
per_device_batch_size=metadata["per_device_batch_size/text_summary"],
num_chips=task_test_config.num_slices * task_test_config.accelerator.num_chips,
step_time=aggregated_metrics["perf/step_time_seconds"],
throughput=throughput,
per_device_tflops_per_sec=aggregated_metrics["perf/per_device_tflops_per_sec"],
ici_mesh_shape=f"[{ici_data_parallelism}, {ici_fsdp_parallelism}, {ici_sequence_parallelism}, {ici_tensor_parallelism}, {ici_autoregressive_parallelism}]",
dcn_mesh_shape=f"[{dcn_data_parallelism}, {dcn_fsdp_parallelism}, {dcn_sequence_parallelism}, {dcn_tensor_parallelism}, {dcn_autoregressive_parallelism}]",
xprof="",
mfu=0.0,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be extracted from TensorBoard?

xla_flags=metadata["libtpu_init_args/text_summary"],
)
return [run_history_row]


def get_gcs_file_location_with_regex(file_location: str) -> str:
"""
Get a file from GCS given a regex in the form of `gs://<your_bucket>/<your_file_path_regex>`.
Expand Down Expand Up @@ -605,6 +690,7 @@ def process_metrics(
metric_history_rows_list = [[]]
metadata_history_rows_list = [[]]
profile_history_rows_list = []
run_history_rows_list = []

# process metrics, metadata, and profile
if task_metric_config is not None:
Expand All @@ -613,10 +699,19 @@ def process_metrics(
base_id, task_metric_config.json_lines.file_location
)
if task_metric_config.tensorboard_summary:
(aggregated_metrics, metadata) = process_tensorboard_summary(
task_metric_config.tensorboard_summary
)
if task_gcp_config.dataset_name == metric_config.DatasetOption.MLPERF_DATASET:
run_history_rows_list = get_run_history_rows(
base_id, aggregated_metrics, metadata, task_test_config
)
(
metric_history_rows_list,
metadata_history_rows_list,
) = process_tensorboard_summary(base_id, task_metric_config.tensorboard_summary)
) = get_metric_history_and_metadata_history_rows(
base_id, aggregated_metrics, metadata
)
if task_metric_config.profile:
has_profile = True
num_profiles = len(task_metric_config.profile.file_locations)
Expand Down Expand Up @@ -671,10 +766,14 @@ def process_metrics(
job_name=benchmark_id,
job_status=test_job_status.value,
)
run_history_row = (
run_history_rows_list[index] if index < len(run_history_rows_list) else None
)
test_run_row = bigquery.TestRun(
job_history_row,
metric_history_rows_list[index],
metadata_history_rows_list[index],
run_history_row,
)
test_run_rows.append(test_run_row)

Expand Down
62 changes: 57 additions & 5 deletions xlml/utils/metric_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,21 @@ def assert_metric_and_dimension_equal(
for index in range(len(expected_metadata)):
self.assertListEqual(actual_metadata[index], expected_metadata[index])

def assert_aggregated_metrics_and_metadata_equal(
self,
actual_aggregated_metrics,
expected_aggregated_metrics,
actual_metadata,
expected_metadata,
):
for key, val in expected_aggregated_metrics.items():
self.assertAlmostEqual(val, actual_aggregated_metrics[key])
self.assertEqual(len(expected_aggregated_metrics), len(actual_aggregated_metrics))

for key, val in expected_metadata.items():
self.assertEqual(val, actual_metadata[key])
self.assertEqual(len(expected_metadata), len(actual_metadata))

@parameterized.named_parameters(
("default", "train_accuracy", None, None, True),
("inclusive_pattern", "train_accuracy", ["eval*"], None, False),
Expand Down Expand Up @@ -209,18 +224,52 @@ def test_process_tensorboard_summary(self):
include_tag_patterns=None,
exclude_tag_patterns=None,
)
actual_metrics, actual_metadata = metric.process_tensorboard_summary(
base_id, summary_config
actual_aggregated_metrics, actual_metadata = metric.process_tensorboard_summary(
summary_config
)
expected_aggregated_metrics = {
"loss": 0.123,
"accuracy": 0.987,
}
expected_metadata = {
"key1": "value1",
"key2": "value2",
"key3": "value3",
}
self.assert_aggregated_metrics_and_metadata_equal(
actual_aggregated_metrics,
expected_aggregated_metrics,
actual_metadata,
expected_metadata,
)

def test_get_metric_history_and_metadata_history_rows(self):
base_id = "test"
uuid = hashlib.sha256(str(base_id + "0").encode("utf-8")).hexdigest()
test_aggregated_metrics = {
"loss": 0.123,
"accuracy": 0.987,
}
test_metadata = {
"key1": "value1",
"key2": "value2",
"key3": "value3",
}
(
actual_metric_history_rows,
actual_metadata_history_rows,
) = metric.get_metric_history_and_metadata_history_rows(
base_id,
test_aggregated_metrics,
test_metadata,
)
loss_metric = bigquery.MetricHistoryRow(
job_uuid=uuid, metric_key="loss", metric_value=0.123
)
accuracy_metric = bigquery.MetricHistoryRow(
job_uuid=uuid, metric_key="accuracy", metric_value=0.987
)
expected_metrics = [[loss_metric, accuracy_metric]]
expected_metric_history_rows = [[loss_metric, accuracy_metric]]
dimension_1 = bigquery.MetadataHistoryRow(
job_uuid=uuid, metadata_key="key1", metadata_value="value1"
)
Expand All @@ -230,9 +279,12 @@ def test_process_tensorboard_summary(self):
dimension_3 = bigquery.MetadataHistoryRow(
job_uuid=uuid, metadata_key="key3", metadata_value="value3"
)
expected_metadata = [[dimension_1, dimension_2, dimension_3]]
expected_metadata_history_rows = [[dimension_1, dimension_2, dimension_3]]
self.assert_metric_and_dimension_equal(
actual_metrics, expected_metrics, actual_metadata, expected_metadata
actual_metric_history_rows,
expected_metric_history_rows,
actual_metadata_history_rows,
expected_metadata_history_rows,
)

@parameterized.named_parameters(
Expand Down
Loading