-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Write to mlperf run_history bigquery table for mlperf runs #120
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ | |
BENCHMARK_BQ_JOB_TABLE_NAME = "job_history" | ||
BENCHMARK_BQ_METRIC_TABLE_NAME = "metric_history" | ||
BENCHMARK_BQ_METADATA_TABLE_NAME = "metadata_history" | ||
BENCHMARK_BQ_RUN_TABLE_NAME = "run_history" | ||
|
||
|
||
@dataclasses.dataclass | ||
|
@@ -54,11 +55,36 @@ class MetadataHistoryRow: | |
metadata_value: str | ||
|
||
|
||
@dataclasses.dataclass | ||
class RunHistoryRow: | ||
uuid: str | ||
description: str | ||
platform: str | ||
date: datetime.datetime | ||
base_cl: str | ||
precision: str | ||
model: str | ||
multislice_topology: str | ||
num_params: int | ||
global_batch_size: int | ||
per_device_batch_size: float | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. per core or device? I feel core is widely used. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
num_chips: int | ||
step_time: float | ||
throughput: float | ||
per_device_tflops_per_sec: float | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see a little bit change of this name, so would like to double check, i.e. indicate teraflop? |
||
ici_mesh_shape: str | ||
dcn_mesh_shape: str | ||
xprof: str | ||
mfu: float | ||
xla_flags: str | ||
|
||
|
||
@dataclasses.dataclass | ||
class TestRun: | ||
job_history: JobHistoryRow | ||
metric_history: Iterable[MetricHistoryRow] | ||
metadata_history: Iterable[MetadataHistoryRow] | ||
run_history: RunHistoryRow = None | ||
|
||
|
||
class JobStatus(enum.Enum): | ||
|
@@ -105,6 +131,10 @@ def metric_history_table_id(self): | |
def metadata_history_table_id(self): | ||
return ".".join((self.project, self.database, BENCHMARK_BQ_METADATA_TABLE_NAME)) | ||
|
||
@property | ||
def run_history_table_id(self): | ||
return ".".join((self.project, self.database, BENCHMARK_BQ_RUN_TABLE_NAME)) | ||
|
||
def is_valid_metric(self, value: float): | ||
"""Check if float metric is valid for BigQuery table.""" | ||
invalid_values = [math.inf, -math.inf, math.nan] | ||
|
@@ -120,6 +150,11 @@ def insert(self, test_runs: Iterable[TestRun]) -> None: | |
# job hisotry rows | ||
job_history_rows = [dataclasses.astuple(run.job_history)] | ||
|
||
# run history rows | ||
run_history_rows = ( | ||
[dataclasses.astuple(run.run_history)] if run.run_history else None | ||
) | ||
|
||
# metric hisotry rows | ||
metric_history_rows = [] | ||
for each in run.metric_history: | ||
|
@@ -137,6 +172,7 @@ def insert(self, test_runs: Iterable[TestRun]) -> None: | |
(self.job_history_table_id, job_history_rows), | ||
(self.metric_history_table_id, metric_history_rows), | ||
(self.metadata_history_table_id, metadata_history_rows), | ||
(self.run_history_table_id, run_history_rows), | ||
]: | ||
if not rows: | ||
continue | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,7 @@ | |
import hashlib | ||
import os | ||
import re | ||
from typing import Dict, Iterable, List, Optional | ||
from typing import Dict, Iterable, List, Optional, Any | ||
import uuid | ||
from absl import logging | ||
import airflow | ||
|
@@ -214,20 +214,18 @@ def process_json_lines( | |
|
||
|
||
def process_tensorboard_summary( | ||
base_id: str, | ||
summary_config: metric_config.SummaryConfig, | ||
) -> (List[List[bigquery.MetricHistoryRow]], List[List[bigquery.MetadataHistoryRow]],): | ||
) -> (Dict[str, Any], Dict[str, Any],): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Dict[str, float] & Dict[str, str]? Same comment for the func below |
||
"""Process metrics and dimensions from TensorBoard file. | ||
|
||
Args: | ||
base_id: The unique ID for this test job. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: remove base_id in comment |
||
summary_config: The configs for TensorBoard summary. | ||
|
||
Returns: | ||
A list of MetricHistoryRow for a test run, and | ||
a list of MetadataHistoryRow ofr a test run in a test job. | ||
A Dict of aggregated metrics and a Dict of metadata parsed | ||
from the given Tensorboard file. | ||
""" | ||
uuid = generate_row_uuid(base_id, 0) | ||
|
||
if isinstance(summary_config.file_location, airflow.XComArg): | ||
file_location = summary_config.file_location.resolve(get_current_context()) | ||
|
@@ -249,6 +247,26 @@ def process_tensorboard_summary( | |
aggregated_metrics[key] = aggregate_metrics(value, aggregation_strategy) | ||
print("aggregated_metrics", aggregated_metrics) | ||
|
||
return aggregated_metrics, metadata | ||
|
||
|
||
def get_metric_history_and_metadata_history_rows( | ||
base_id: str, | ||
aggregated_metrics: Dict[str, Any], | ||
metadata: Dict[str, Any], | ||
) -> (List[List[bigquery.MetricHistoryRow]], List[List[bigquery.MetadataHistoryRow]],): | ||
"""Create metric_history and metadata_history rows to insert into BigQuery | ||
|
||
Args: | ||
base_id: The unique ID for this test job. | ||
aggregated_metrics: Dict of aggregated metrics | ||
metadata: Dict of metadata | ||
|
||
Returns: | ||
A list of MetricHistoryRow for a test run, and | ||
a list of MetadataHistoryRow ofr a test run in a test job. | ||
""" | ||
uuid = generate_row_uuid(base_id, 0) | ||
metric_history_rows = [] | ||
metadata_history_rows = [] | ||
|
||
|
@@ -267,6 +285,73 @@ def process_tensorboard_summary( | |
return [metric_history_rows], [metadata_history_rows] | ||
|
||
|
||
def get_run_history_rows( | ||
base_id: str, | ||
aggregated_metrics: Dict[str, Any], | ||
metadata: Dict[str, Any], | ||
task_test_config: test_config.TestConfig[test_config.Tpu], | ||
) -> List[List[bigquery.RunHistoryRow]]: | ||
"""Create run_history rows to insert into BigQuery | ||
|
||
Args: | ||
base_id: The unique ID for this test job. | ||
aggregated_metrics: Dict of aggregated metrics | ||
metadata: Dict of metadata | ||
task_test_config: Test config | ||
|
||
Returns: | ||
A list of RunHistoryRow for a test run in a test job. | ||
""" | ||
uuid = generate_row_uuid(base_id, 0) | ||
dcn_data_parallelism = metadata["dcn_data_parallelism/text_summary"] | ||
dcn_fsdp_parallelism = metadata["dcn_fsdp_parallelism/text_summary"] | ||
dcn_sequence_parallelism = metadata["dcn_sequence_parallelism/text_summary"] | ||
dcn_tensor_parallelism = metadata["dcn_tensor_parallelism/text_summary"] | ||
dcn_autoregressive_parallelism = metadata[ | ||
"dcn_autoregressive_parallelism/text_summary" | ||
] | ||
ici_data_parallelism = metadata["ici_data_parallelism/text_summary"] | ||
ici_fsdp_parallelism = metadata["ici_fsdp_parallelism/text_summary"] | ||
ici_sequence_parallelism = metadata["ici_sequence_parallelism/text_summary"] | ||
ici_tensor_parallelism = metadata["ici_tensor_parallelism/text_summary"] | ||
ici_autoregressive_parallelism = metadata[ | ||
"ici_autoregressive_parallelism/text_summary" | ||
] | ||
throughput = ( | ||
int(metadata["global_batch_size_to_train_on/text_summary"]) | ||
* int(metadata["max_target_length/text_summary"]) | ||
) / aggregated_metrics["perf/step_time_seconds"] | ||
precision = ( | ||
"bfloat16" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will we always has either bf16 or int8? Or any other quantization is available? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For now only bf16 and int8 are available |
||
if not metadata["quantization/text_summary"] | ||
else metadata["quantization/text_summary"] | ||
) | ||
|
||
run_history_row = bigquery.RunHistoryRow( | ||
uuid=uuid, | ||
description=task_test_config.test_name, | ||
platform="Cloud", | ||
date=datetime.datetime.now(), | ||
base_cl="", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the plan for this metadata? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not quite sure yet, I think I will need to add a change in MaxText to get this value into the tensorboard file. But for now will leave it blank. |
||
precision=precision, | ||
model=metadata["model_name/text_summary"], | ||
multislice_topology=f"{task_test_config.num_slices}x{task_test_config.accelerator.name}", | ||
num_params=metadata["num_model_parameters/text_summary"], | ||
global_batch_size=metadata["global_batch_size_to_train_on/text_summary"], | ||
per_device_batch_size=metadata["per_device_batch_size/text_summary"], | ||
num_chips=task_test_config.num_slices * task_test_config.accelerator.num_chips, | ||
step_time=aggregated_metrics["perf/step_time_seconds"], | ||
throughput=throughput, | ||
per_device_tflops_per_sec=aggregated_metrics["perf/per_device_tflops_per_sec"], | ||
ici_mesh_shape=f"[{ici_data_parallelism}, {ici_fsdp_parallelism}, {ici_sequence_parallelism}, {ici_tensor_parallelism}, {ici_autoregressive_parallelism}]", | ||
dcn_mesh_shape=f"[{dcn_data_parallelism}, {dcn_fsdp_parallelism}, {dcn_sequence_parallelism}, {dcn_tensor_parallelism}, {dcn_autoregressive_parallelism}]", | ||
xprof="", | ||
mfu=0.0, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this be extracted from TensorBoard? |
||
xla_flags=metadata["libtpu_init_args/text_summary"], | ||
) | ||
return [run_history_row] | ||
|
||
|
||
def get_gcs_file_location_with_regex(file_location: str) -> str: | ||
""" | ||
Get a file from GCS given a regex in the form of `gs://<your_bucket>/<your_file_path_regex>`. | ||
|
@@ -605,6 +690,7 @@ def process_metrics( | |
metric_history_rows_list = [[]] | ||
metadata_history_rows_list = [[]] | ||
profile_history_rows_list = [] | ||
run_history_rows_list = [] | ||
|
||
# process metrics, metadata, and profile | ||
if task_metric_config is not None: | ||
|
@@ -613,10 +699,19 @@ def process_metrics( | |
base_id, task_metric_config.json_lines.file_location | ||
) | ||
if task_metric_config.tensorboard_summary: | ||
(aggregated_metrics, metadata) = process_tensorboard_summary( | ||
task_metric_config.tensorboard_summary | ||
) | ||
if task_gcp_config.dataset_name == metric_config.DatasetOption.MLPERF_DATASET: | ||
run_history_rows_list = get_run_history_rows( | ||
base_id, aggregated_metrics, metadata, task_test_config | ||
) | ||
( | ||
metric_history_rows_list, | ||
metadata_history_rows_list, | ||
) = process_tensorboard_summary(base_id, task_metric_config.tensorboard_summary) | ||
) = get_metric_history_and_metadata_history_rows( | ||
base_id, aggregated_metrics, metadata | ||
) | ||
if task_metric_config.profile: | ||
has_profile = True | ||
num_profiles = len(task_metric_config.profile.file_locations) | ||
|
@@ -671,10 +766,14 @@ def process_metrics( | |
job_name=benchmark_id, | ||
job_status=test_job_status.value, | ||
) | ||
run_history_row = ( | ||
run_history_rows_list[index] if index < len(run_history_rows_list) else None | ||
) | ||
test_run_row = bigquery.TestRun( | ||
job_history_row, | ||
metric_history_rows_list[index], | ||
metadata_history_rows_list[index], | ||
run_history_row, | ||
) | ||
test_run_rows.append(test_run_row) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to name it more specifically to mlperf? i.e.
mlperf_history
ormlperf_result
?