Skip to content

Commit

Permalink
Internal enhancements (#336)
Browse files Browse the repository at this point in the history
* Switch back to p90

* Implement latency ceiling triggers

* Updates

* Make table sizing run faster

* Fix query routing location bug
  • Loading branch information
geoffxy authored Oct 30, 2023
1 parent fe547ba commit 56c51df
Show file tree
Hide file tree
Showing 21 changed files with 191 additions and 78 deletions.
16 changes: 10 additions & 6 deletions config/planner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ triggers:
ceiling_s: 30.0
sustained_epochs: 3

txn_latency_ceiling:
ceiling_s: 0.030
sustained_epochs: 3

###
### Beam planning constants.
###
Expand Down Expand Up @@ -311,19 +315,19 @@ aurora_txns:
C_2: 0.00108688

# Used for latency.
K: 1.0581694841384888
b_p50: 0.0022556402254849672
b_p95: 0.00383179122582078
K: 1.0194002389907837
b_p50: 0.005365931428968906
b_p90: 0.005891922861337662 # TODO: Update

imdb_extended_100g:
# Used for "load translation"
C_1: 0.00282164
C_2: 0.00108688

# Used for latency.
K: 1.0729172229766846
b_p50: 0.001035563531331718
b_p95: 0.00274773221462965
K: 1.0811012983322144
b_p50: 0.0008631267119199038
b_p90: 0.002251814818009734


aurora_scaling:
Expand Down
2 changes: 1 addition & 1 deletion config/temp_config_sample.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
latency_ceiling_s: 30.0
txn_latency_p50_ceiling_s: 0.020 # Currently unused.
txn_latency_p95_ceiling_s: 0.030
txn_latency_p90_ceiling_s: 0.030

# Use this instead of the individual paths below.
std_dataset_path: workloads/IMDB_20GB/regular_test/
Expand Down
34 changes: 22 additions & 12 deletions src/brad/admin/run_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def parse_metrics(kv_str: str) -> Dict[str, float]:
return metrics


def run_planner(args) -> None:
async def run_planner_impl(args) -> None:
"""
This admin action is used to manually test the blueprint planner
independently of the rest of BRAD.
Expand Down Expand Up @@ -153,17 +153,20 @@ def run_planner(args) -> None:
)
workload_dir = pathlib.Path(args.workload_dir)
table_sizer = TableSizer(engines, config)
builder = WorkloadBuilder()
workload = (
builder.add_analytical_queries_and_counts_from_file(
builder = (
WorkloadBuilder()
.add_analytical_queries_and_counts_from_file(
args.query_bank_file,
args.query_counts_file,
)
.add_transactional_queries_from_file(workload_dir / "oltp.sql")
.for_period(timedelta(hours=1))
.table_sizes_from_engines(blueprint_mgr.get_blueprint(), table_sizer)
.build()
)
workload = (
await builder.table_sizes_from_engines(
blueprint_mgr.get_blueprint(), table_sizer
)
).build()

elif args.workload_source == "workload_dir":
assert args.analytical_rate_per_s is not None
Expand All @@ -173,17 +176,20 @@ def run_planner(args) -> None:
)
table_sizer = TableSizer(engines, config)
workload_dir = pathlib.Path(args.workload_dir)
builder = WorkloadBuilder()
workload = (
builder.add_analytical_queries_from_file(workload_dir / "olap.sql")
builder = (
WorkloadBuilder()
.add_analytical_queries_from_file(workload_dir / "olap.sql")
.add_transactional_queries_from_file(workload_dir / "oltp.sql")
.uniform_per_analytical_query_rate(
args.analytical_rate_per_s, period=timedelta(seconds=1)
)
.for_period(timedelta(hours=1))
.table_sizes_from_engines(blueprint_mgr.get_blueprint(), table_sizer)
.build()
)
workload = (
await builder.table_sizes_from_engines(
blueprint_mgr.get_blueprint(), table_sizer
)
).build()

# 5. Load the pre-computed predictions.
prediction_dir = pathlib.Path(args.predictions_dir)
Expand Down Expand Up @@ -234,7 +240,7 @@ def run_planner(args) -> None:
# TODO: Make this configurable.
comparator=best_cost_under_perf_ceilings(
max_query_latency_s=args.latency_ceiling_s,
max_txn_p95_latency_s=0.020, # FIXME: Add command-line argument if needed.
max_txn_p90_latency_s=0.020, # FIXME: Add command-line argument if needed.
),
metrics_provider=metrics_provider,
data_access_provider=data_access_provider,
Expand Down Expand Up @@ -278,4 +284,8 @@ async def on_new_blueprint(blueprint: Blueprint, score: Score):
)


def run_planner(args) -> None:
asyncio.run(run_planner_impl(args))


_PICKLE_FILE_NAME = "workload.pickle"
4 changes: 2 additions & 2 deletions src/brad/config/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class FrontEndMetric(enum.Enum):
TxnEndPerSecond = "txn_end_per_s"

QueryLatencySecondP50 = "query_latency_s_p50"
QueryLatencySecondP95 = "query_latency_s_p95"
QueryLatencySecondP90 = "query_latency_s_p90"

TxnLatencySecondP50 = "txn_latency_s_p50"
TxnLatencySecondP95 = "txn_latency_s_p95"
TxnLatencySecondP90 = "txn_latency_s_p90"
4 changes: 2 additions & 2 deletions src/brad/config/temp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ def latency_ceiling_s(self) -> float:
def txn_latency_p50_ceiling_s(self) -> float:
return float(self._raw["txn_latency_p50_ceiling_s"])

def txn_latency_p95_ceiling_s(self) -> float:
return float(self._raw["txn_latency_p95_ceiling_s"])
def txn_latency_p90_ceiling_s(self) -> float:
return float(self._raw["txn_latency_p90_ceiling_s"])

def std_dataset_path(self) -> Optional[pathlib.Path]:
if "std_dataset_path" not in self._raw:
Expand Down
4 changes: 2 additions & 2 deletions src/brad/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ async def _run_setup(self) -> None:
)
comparator = best_cost_under_perf_ceilings(
max_query_latency_s=self._temp_config.latency_ceiling_s(),
max_txn_p95_latency_s=self._temp_config.txn_latency_p95_ceiling_s(),
max_txn_p90_latency_s=self._temp_config.txn_latency_p90_ceiling_s(),
)
else:
logger.warning(
Expand All @@ -188,7 +188,7 @@ async def _run_setup(self) -> None:
latency_scorer = _NoopAnalyticsScorer()
data_access_provider = _NoopDataAccessProvider()
comparator = best_cost_under_perf_ceilings(
max_query_latency_s=10, max_txn_p95_latency_s=0.030
max_query_latency_s=10, max_txn_p90_latency_s=0.030
)

self._planner = BlueprintPlannerFactory.create(
Expand Down
18 changes: 9 additions & 9 deletions src/brad/daemon/front_end_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ def __init__(
FrontEndMetric.TxnEndPerSecond.value,
FrontEndMetric.QueryLatencySecondP50.value,
FrontEndMetric.TxnLatencySecondP50.value,
FrontEndMetric.QueryLatencySecondP95.value,
FrontEndMetric.TxnLatencySecondP95.value,
FrontEndMetric.QueryLatencySecondP90.value,
FrontEndMetric.TxnLatencySecondP90.value,
]
self._values_df = pd.DataFrame(columns=self._ordered_metrics.copy())
self._logger = MetricsLogger.create_from_config(
Expand Down Expand Up @@ -149,26 +149,26 @@ async def fetch_latest(self) -> None:
"Missing latency sketch values for %s", metric_key
)
p50_val = 0.0
p95_val = 0.0
p90_val = 0.0
else:
p50_val_cand = merged.get_quantile_value(0.5)
p95_val_cand = merged.get_quantile_value(0.9)
p90_val_cand = merged.get_quantile_value(0.9)
p50_val = p50_val_cand if p50_val_cand is not None else 0.0
p95_val = p95_val_cand if p95_val_cand is not None else 0.0
p90_val = p90_val_cand if p90_val_cand is not None else 0.0

if metric_key == _MetricKey.QueryLatencySecond:
data_cols[FrontEndMetric.QueryLatencySecondP50.value].append(
p50_val
)
data_cols[FrontEndMetric.QueryLatencySecondP95.value].append(
p95_val
data_cols[FrontEndMetric.QueryLatencySecondP90.value].append(
p90_val
)
else:
data_cols[FrontEndMetric.TxnLatencySecondP50.value].append(
p50_val
)
data_cols[FrontEndMetric.TxnLatencySecondP95.value].append(
p95_val
data_cols[FrontEndMetric.TxnLatencySecondP90.value].append(
p90_val
)
else:
logger.warning("Unhandled front end metric: %s", metric_key)
Expand Down
2 changes: 1 addition & 1 deletion src/brad/planner/beam/query_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async def _run_replan_impl(self, window_multiplier: int = 1) -> None:
# 1. Fetch the next workload and apply predictions.
metrics, metrics_timestamp = self._metrics_provider.get_metrics()
logger.debug("Using metrics: %s", str(metrics))
current_workload, next_workload = self._workload_provider.get_workloads(
current_workload, next_workload = await self._workload_provider.get_workloads(
metrics_timestamp, window_multiplier, desired_period=timedelta(hours=1)
)
self._analytics_latency_scorer.apply_predicted_latencies(next_workload)
Expand Down
2 changes: 1 addition & 1 deletion src/brad/planner/beam/table_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async def _run_replan_impl(self, window_multiplier: int = 1) -> None:

# 1. Fetch metrics and the next workload and then apply predictions.
metrics, metrics_timestamp = self._metrics_provider.get_metrics()
current_workload, next_workload = self._workload_provider.get_workloads(
current_workload, next_workload = await self._workload_provider.get_workloads(
metrics_timestamp, window_multiplier, desired_period=timedelta(hours=1)
)
self._analytics_latency_scorer.apply_predicted_latencies(next_workload)
Expand Down
11 changes: 11 additions & 0 deletions src/brad/planner/beam/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from brad.planner.triggers.elapsed_time import ElapsedTimeTrigger
from brad.planner.triggers.query_latency_ceiling import QueryLatencyCeiling
from brad.planner.triggers.trigger import Trigger
from brad.planner.triggers.txn_latency_ceiling import TransactionLatencyCeiling
from brad.planner.triggers.variable_costs import VariableCosts


Expand Down Expand Up @@ -64,4 +65,14 @@ def get_beam_triggers(
)
)

txn_latency_ceiling = trigger_config["txn_latency_ceiling"]
if "disabled" not in txn_latency_ceiling:
trigger_list.append(
TransactionLatencyCeiling(
monitor,
latency_ceiling["ceiling_s"],
latency_ceiling["sustained_epochs"],
)
)

return trigger_list
18 changes: 9 additions & 9 deletions src/brad/planner/compare/cost.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,27 +86,27 @@ def is_better_than(left: ComparableBlueprint, right: ComparableBlueprint) -> boo

def best_cost_under_perf_ceilings(
max_query_latency_s: float,
max_txn_p95_latency_s: float,
max_txn_p90_latency_s: float,
) -> BlueprintComparator:
def is_better_than(left: ComparableBlueprint, right: ComparableBlueprint) -> bool:
# Check transactional latency ceilings first.
left_txn_p95 = left.get_predicted_transactional_latencies()[0]
right_txn_p95 = right.get_predicted_transactional_latencies()[0]
left_txn_p90 = left.get_predicted_transactional_latencies()[0]
right_txn_p90 = right.get_predicted_transactional_latencies()[0]

# If one of these candidates have NaN predictions, we need to
# consider other factors. NaN indicates that a prediction is not
# available (e.g., due to missing metrics).
if not math.isnan(left_txn_p95) and not math.isnan(right_txn_p95):
if not math.isnan(left_txn_p90) and not math.isnan(right_txn_p90):
# Both above the ceiling, return the blueprint that does better on
# performance.
if (
left_txn_p95 > max_txn_p95_latency_s
and right_txn_p95 > max_txn_p95_latency_s
left_txn_p90 > max_txn_p90_latency_s
and right_txn_p90 > max_txn_p90_latency_s
):
return left_txn_p95 < right_txn_p95
elif left_txn_p95 > max_txn_p95_latency_s:
return left_txn_p90 < right_txn_p90
elif left_txn_p90 > max_txn_p90_latency_s:
return False
elif right_txn_p95 > max_txn_p95_latency_s:
elif right_txn_p90 > max_txn_p90_latency_s:
return True

# Query latency ceilings.
Expand Down
12 changes: 6 additions & 6 deletions src/brad/planner/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"aurora_reader_load_minute_avg",
"txn_completions_per_s",
"txn_lat_s_p50",
"txn_lat_s_p95",
"txn_lat_s_p90",
],
)

Expand Down Expand Up @@ -174,13 +174,13 @@ def get_metrics(self) -> Tuple[Metrics, datetime]:
default_value=0.0,
name="txn_lat_s_p50",
)
txn_lat_s_p95 = self._extract_most_recent_possibly_missing(
txn_lat_s_p90 = self._extract_most_recent_possibly_missing(
front_end.loc[
front_end.index <= most_recent_common,
FrontEndMetric.TxnLatencySecondP95.value,
FrontEndMetric.TxnLatencySecondP90.value,
],
default_value=0.0,
name="txn_lat_s_p95",
name="txn_lat_s_p90",
)

aurora_writer_rel = aurora_writer.loc[aurora_writer.index <= most_recent_common]
Expand Down Expand Up @@ -226,7 +226,7 @@ def get_metrics(self) -> Tuple[Metrics, datetime]:
aurora_reader_load_minute_avg=aurora_reader_load_minute,
txn_completions_per_s=txn_per_s,
txn_lat_s_p50=txn_lat_s_p50,
txn_lat_s_p95=txn_lat_s_p95,
txn_lat_s_p90=txn_lat_s_p90,
),
most_recent_common.to_pydatetime(),
)
Expand Down Expand Up @@ -292,5 +292,5 @@ def _recover_load_value(self, aurora_rel: pd.DataFrame, metric_name: str) -> flo
_FRONT_END_METRICS = [
FrontEndMetric.TxnEndPerSecond.value,
FrontEndMetric.TxnLatencySecondP50.value,
FrontEndMetric.TxnLatencySecondP95.value,
FrontEndMetric.TxnLatencySecondP90.value,
]
2 changes: 1 addition & 1 deletion src/brad/planner/neighborhood/neighborhood.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ async def _run_replan_impl(self, window_multiplier: int = 1) -> None:
# the daemon process.
logger.info("Running a replan.")
self._log_current_metrics()
current_workload, next_workload = self._workload_provider.get_workloads(
current_workload, next_workload = await self._workload_provider.get_workloads(
datetime.now().astimezone(pytz.utc), window_multiplier
)
workload_filters = [
Expand Down
1 change: 1 addition & 0 deletions src/brad/planner/scoring/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ async def simulate_current_workload_routing(self, router: Router) -> None:
maybe_eng = query.primary_execution_location()
if maybe_eng is not None:
self.current_query_locations[maybe_eng].append(qidx)
continue

# Fall back to the router if the historical routing location is not
# available.
Expand Down
6 changes: 3 additions & 3 deletions src/brad/planner/scoring/performance/unified_aurora.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,14 +248,14 @@ def _scale_txn_latency(
to_prov: Provisioning,
ctx: "ScoringContext",
) -> npt.NDArray:
observed_lats = np.array([ctx.metrics.txn_lat_s_p50, ctx.metrics.txn_lat_s_p95])
observed_lats = np.array([ctx.metrics.txn_lat_s_p50, ctx.metrics.txn_lat_s_p90])

# Q(u) = a / (K - u) + b ; u is CPU utilization in [0, 1]
# --> Q(u') = (K - u) / (K - u') (Q(u) - b) + b

model = ctx.planner_config.aurora_txn_coefs(ctx.schema_name)
K = model["K"]
b = np.array([model["b_p50"], model["b_p95"]])
b = np.array([model["b_p50"], model["b_p90"]])

curr_num_cpus = aurora_num_cpus(curr_prov)
next_num_cpus = aurora_num_cpus(to_prov)
Expand Down Expand Up @@ -301,7 +301,7 @@ def add_debug_values(self, dest: Dict[str, int | float | str]) -> None:
dest["aurora_pred_txn_peak_cpu_denorm"] = self.pred_txn_peak_cpu_denorm
(
dest["aurora_pred_txn_lat_s_p50"],
dest["aurora_pred_txn_lat_s_p95"],
dest["aurora_pred_txn_lat_s_p90"],
) = self.scaled_txn_lats
dest.update(self.debug_values)

Expand Down
Loading

0 comments on commit 56c51df

Please sign in to comment.