Skip to content

Commit

Permalink
Fix the variable costs trigger (#343)
Browse files Browse the repository at this point in the history
  • Loading branch information
geoffxy authored Nov 1, 2023
1 parent 633c524 commit 27211d8
Showing 1 changed file with 21 additions and 8 deletions.
29 changes: 21 additions & 8 deletions src/brad/planner/triggers/variable_costs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytz
import pandas as pd
from datetime import datetime, timedelta
from typing import List
from typing import List, Tuple

from .trigger import Trigger
from brad.config.engine import Engine
Expand Down Expand Up @@ -60,7 +60,13 @@ async def should_replan(self) -> bool:
)
return False

current_hourly_cost = await self._estimate_current_scan_hourly_cost()
aurora_cost, athena_cost = await self._estimate_current_scan_hourly_cost()

if self._planner_config.use_io_optimized_aurora():
current_hourly_cost = athena_cost
else:
current_hourly_cost = athena_cost + aurora_cost

if current_hourly_cost <= 1e-5:
# Treated as 0.
logger.debug(
Expand All @@ -85,9 +91,9 @@ async def should_replan(self) -> bool:

return False

async def _estimate_current_scan_hourly_cost(self) -> float:
async def _estimate_current_scan_hourly_cost(self) -> Tuple[float, float]:
if self._current_blueprint is None:
return 0.0
return 0.0, 0.0

# Extract the queries seen in the last window.
window_end = datetime.now()
Expand All @@ -107,7 +113,7 @@ async def _estimate_current_scan_hourly_cost(self) -> float:
)
)
if len(workload.analytical_queries()) == 0:
return 0.0
return 0.0, 0.0
self._data_access_provider.apply_access_statistics(workload)

# Compute the scan cost of this last window of queries.
Expand All @@ -120,14 +126,20 @@ async def _estimate_current_scan_hourly_cost(self) -> float:
)

for idx, q in enumerate(workload.analytical_queries()):
engine = await router.engine_for(q)
maybe_engine = q.primary_execution_location()
if maybe_engine is None:
engine = await router.engine_for(q)
else:
engine = maybe_engine

if engine == Engine.Aurora:
aurora_query_indices.append(idx)
aurora_queries.append(q)
elif engine == Engine.Athena:
athena_query_indices.append(idx)
athena_queries.append(q)

# NOTE: Ideally we use the actual values.
aurora_accessed_pages = compute_aurora_accessed_pages(
aurora_queries,
workload.get_predicted_aurora_pages_accessed_batch(aurora_query_indices),
Expand All @@ -139,10 +151,11 @@ async def _estimate_current_scan_hourly_cost(self) -> float:
)

# We use the hit rate to estimate Aurora scan costs.
# Note that if we are using I/O optimized Aurora, there is no
# incremental scan cost.
lookback_epochs = math.ceil(
self._planner_config.planning_window() / self._config.epoch_length
)
# TODO: If there are read replicas, we should use the hit rate from them instead.
aurora_reader_metrics = self._monitor.aurora_reader_metrics()
if len(aurora_reader_metrics) > 0:
reader_hit_rates = []
Expand All @@ -168,7 +181,7 @@ async def _estimate_current_scan_hourly_cost(self) -> float:
athena_scanned_bytes, self._planner_config
)

return aurora_scan_cost + athena_scan_cost
return aurora_scan_cost, athena_scan_cost


_HIT_RATE_METRIC = "BufferCacheHitRatio_Average"

0 comments on commit 27211d8

Please sign in to comment.