Skip to content

Commit

Permalink
Various blueprint planning improvements and workload adjustments (#347)
Browse files Browse the repository at this point in the history
* Exclude the query factor, add initial starting load

* Adjustments

* Make load fraction configurable

* Add recent change trigger

* Fixes

* Adjust workload again

* Adjust runners

* Pass trigger reason back, adjust skip blueprint check

* Adjust length

* Adjust txn workload

* Re-enable recent change

* Adjust workload

* Stick to 60 minutes
  • Loading branch information
geoffxy authored Nov 2, 2023
1 parent 294840a commit d7aa99f
Show file tree
Hide file tree
Showing 15 changed files with 189 additions and 35 deletions.
4 changes: 4 additions & 0 deletions config/planner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ triggers:
ceiling_s: 0.030
sustained_epochs: 3

recent_change: {}

###
### Beam planning constants.
###
Expand Down Expand Up @@ -348,3 +350,5 @@ redshift_scaling:

use_io_optimized_aurora: true
use_recorded_routing_if_available: true

aurora_initialize_load_fraction: 0.25
6 changes: 3 additions & 3 deletions experiments/15-e2e-scenarios-v2/scale_down/run_workload.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ log_workload_point "brad_start_initiated"
sleep 30

log_workload_point "clients_starting"
start_repeating_olap_runner 8 15 5 $ra_query_indexes "ra_8"
start_repeating_olap_runner 6 15 5 $ra_query_indexes "ra_8"
rana_pid=$runner_pid

start_txn_runner 4
start_txn_runner 3
txn_pid=$runner_pid

start_repeating_olap_runner 1 70 5 "61,71,75" "ra_1_special"
Expand All @@ -46,7 +46,7 @@ trap "inner_cancel_experiment" TERM
total_second_phase_time_s="$((60 * 60))"
wait_start="$(date -u +%s)"

poll_file_for_event $COND_OUT/brad_daemon_events.csv "post_transition_completed" 45
poll_file_for_event $COND_OUT/brad_daemon_events.csv "post_transition_completed" 30
log_workload_point "after_replan"

wait_end="$(date -u +%s)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ export BRAD_INITIAL_ROUTE_REDSHIFT_ONLY=1
start_brad_debug $config_file $planner_config_file
sleep 10

start_repeating_olap_runner 8 15 5 $ra_query_indexes "ra_8"
start_repeating_olap_runner 6 15 5 $ra_query_indexes "ra_8"
rana_pid=$runner_pid

start_txn_runner 4
start_txn_runner 3
txn_pid=$runner_pid

start_repeating_olap_runner 1 70 5 "61,71,75" "ra_1_special"
Expand Down
9 changes: 6 additions & 3 deletions src/brad/admin/run_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import pathlib
import pytz
from typing import Dict
from typing import Dict, Optional
from datetime import timedelta, datetime

from brad.asset_manager import AssetManager
Expand All @@ -21,6 +21,7 @@
from brad.planner.scoring.performance.precomputed_predictions import (
PrecomputedPredictions,
)
from brad.planner.triggers.trigger import Trigger
from brad.planner.metrics import (
MetricsFromMonitor,
FixedMetricsProvider,
Expand Down Expand Up @@ -248,7 +249,9 @@ async def run_planner_impl(args) -> None:
)
asyncio.run(monitor.fetch_latest())

async def on_new_blueprint(blueprint: Blueprint, score: Score):
async def on_new_blueprint(
blueprint: Blueprint, score: Score, _trigger: Optional[Trigger]
):
logger.info("Selected new blueprint")
logger.info("%s", blueprint)

Expand Down Expand Up @@ -276,7 +279,7 @@ async def on_new_blueprint(blueprint: Blueprint, score: Score):
event_loop = asyncio.new_event_loop()
event_loop.set_debug(enabled=args.debug)
asyncio.set_event_loop(event_loop)
asyncio.run(planner.run_replan())
asyncio.run(planner.run_replan(trigger=None))

if args.save_pickle:
workload.serialize_for_debugging(
Expand Down
3 changes: 3 additions & 0 deletions src/brad/config/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,6 @@ def flag(self, key: str, default: bool = False) -> bool:
return default
else:
return self._raw[key]

def aurora_initialize_load_fraction(self) -> float:
return self._raw["aurora_initialize_load_fraction"]
50 changes: 42 additions & 8 deletions src/brad/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
from brad.planner.scoring.performance.precomputed_predictions import (
PrecomputedPredictions,
)
from brad.planner.triggers.trigger import Trigger
from brad.planner.triggers.recent_change import RecentChange
from brad.planner.workload import Workload
from brad.planner.workload.builder import WorkloadBuilder
from brad.planner.workload.provider import LoggedWorkloadProvider
Expand Down Expand Up @@ -339,7 +341,9 @@ async def _read_front_end_messages(self, front_end: "_FrontEndProcess") -> None:
str(message),
)

async def _handle_new_blueprint(self, blueprint: Blueprint, score: Score) -> None:
async def _handle_new_blueprint(
self, blueprint: Blueprint, score: Score, trigger: Optional[Trigger]
) -> None:
"""
Informs the server about a new blueprint.
"""
Expand All @@ -351,7 +355,7 @@ async def _handle_new_blueprint(self, blueprint: Blueprint, score: Score) -> Non
if self._system_event_logger is not None:
self._system_event_logger.log(SystemEvent.NewBlueprintProposed)

if self._should_skip_blueprint(blueprint, score):
if self._should_skip_blueprint(blueprint, score, trigger):
if self._system_event_logger is not None:
self._system_event_logger.log(SystemEvent.NewBlueprintSkipped)
return
Expand Down Expand Up @@ -383,7 +387,9 @@ async def _handle_new_blueprint(self, blueprint: Blueprint, score: Score) -> Non
)
self._transition_task = asyncio.create_task(self._run_transition_part_one())

def _should_skip_blueprint(self, blueprint: Blueprint, score: Score) -> bool:
def _should_skip_blueprint(
self, blueprint: Blueprint, score: Score, trigger: Optional[Trigger]
) -> bool:
"""
This is called whenever the planner chooses a new blueprint. The purpose
is to avoid transitioning to blueprints with few changes.
Expand All @@ -407,15 +413,34 @@ def _should_skip_blueprint(self, blueprint: Blueprint, score: Score) -> bool:
logger.info("Planner selected an identical blueprint - skipping.")
return True

if diff.aurora_diff() is not None or diff.redshift_diff() is not None:
return False

current_score = self._blueprint_mgr.get_active_score()
if current_score is None:
# Do not skip - we are currently missing the score of the active
# blueprint, so there is nothing to compare to.
return False

if trigger is not None and isinstance(trigger, RecentChange):
# TODO(geoffxy): This should eventually be removed. The right
# solution is doing a comparison during blueprint planning. Added
# temporarily on November 2, 2023.
current_cost = (
current_score.provisioning_cost
+ current_score.workload_scan_cost
+ current_score.storage_cost
)
next_cost = (
score.provisioning_cost + score.workload_scan_cost + score.storage_cost
)

if next_cost > current_cost:
logger.info(
"Skipping a RecentChange triggered blueprint because a higher-cost blueprint was selected."
)
return True

if diff.aurora_diff() is not None or diff.redshift_diff() is not None:
return False

current_dist = current_score.normalized_query_count_distribution()
next_dist = score.normalized_query_count_distribution()
abs_delta = np.abs(next_dist - current_dist).sum()
Expand Down Expand Up @@ -534,7 +559,9 @@ async def _handle_internal_command(self, command: str) -> RowList:
if self._system_event_logger is not None:
self._system_event_logger.log(SystemEvent.ManuallyTriggeredReplan)
try:
await self._planner.run_replan(window_multiplier)
await self._planner.run_replan(
trigger=None, window_multiplier=window_multiplier
)
return [("Planner completed. See the daemon's logs for more details.",)]
except Exception as ex:
logger.exception("Encountered exception when running the planner.")
Expand All @@ -550,9 +577,16 @@ async def _run_transition_part_one(self) -> None:
transitioning_to_version = tm.next_version

if self._system_event_logger is not None:
next_blueprint = tm.next_blueprint
assert next_blueprint is not None
next_aurora = str(next_blueprint.aurora_provisioning())
next_redshift = str(next_blueprint.redshift_provisioning())

self._system_event_logger.log(
SystemEvent.PreTransitionStarted,
"version={}".format(transitioning_to_version),
f"version={transitioning_to_version},"
f"aurora={next_aurora},"
f"redshift={next_redshift}",
)

def update_monitor_sources():
Expand Down
24 changes: 17 additions & 7 deletions src/brad/planner/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

logger = logging.getLogger(__name__)

NewBlueprintCallback = Callable[[Blueprint, Score], Coroutine[None, None, None]]
NewBlueprintCallback = Callable[
[Blueprint, Score, Optional[Trigger]], Coroutine[None, None, None]
]


class BlueprintPlanner:
Expand Down Expand Up @@ -98,27 +100,33 @@ async def run_forever(self) -> None:
SystemEvent.TriggeredReplan,
"trigger={}".format(t.name()),
)
await self.run_replan()
await self.run_replan(trigger=t)
break
else:
logger.debug(
"A replan is already in progress or triggers are temporarily disabled. Skipping the trigger check."
)
await asyncio.sleep(check_period)

async def run_replan(self, window_multiplier: int = 1) -> None:
async def run_replan(
self, trigger: Optional[Trigger], window_multiplier: int = 1
) -> None:
"""
Triggers a "forced" replan. Used for debugging.
Use `window_multiplier` to expand the window used for planning.
"""
try:
self._replan_in_progress = True
await self._run_replan_impl(window_multiplier)
for t in self.get_triggers():
t.on_replan(trigger)
await self._run_replan_impl(trigger, window_multiplier)
finally:
self._replan_in_progress = False

async def _run_replan_impl(self, window_multiplier: int = 1) -> None:
async def _run_replan_impl(
self, trigger: Optional[Trigger], window_multiplier: int = 1
) -> None:
"""
Implementers should override this method to define the blueprint
optimization algorithm.
Expand Down Expand Up @@ -166,12 +174,14 @@ def register_new_blueprint_callback(self, callback: NewBlueprintCallback) -> Non
"""
self._callbacks.append(callback)

async def _notify_new_blueprint(self, blueprint: Blueprint, score: Score) -> None:
async def _notify_new_blueprint(
self, blueprint: Blueprint, score: Score, trigger: Optional[Trigger]
) -> None:
"""
Concrete planners should call this method to notify subscribers about
the next blueprint.
"""
tasks = []
for callback in self._callbacks:
tasks.append(asyncio.create_task(callback(blueprint, score)))
tasks.append(asyncio.create_task(callback(blueprint, score, trigger)))
await asyncio.gather(*tasks)
8 changes: 5 additions & 3 deletions src/brad/planner/beam/query_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import logging
from datetime import timedelta
from typing import Iterable, List
from typing import Iterable, List, Optional

from brad.config.engine import Engine, EngineBitmapValues
from brad.planner.abstract import BlueprintPlanner
Expand Down Expand Up @@ -43,7 +43,9 @@ def __init__(self, *args, **kwargs) -> None:
def get_triggers(self) -> Iterable[Trigger]:
return self._triggers

async def _run_replan_impl(self, window_multiplier: int = 1) -> None:
async def _run_replan_impl(
self, trigger: Optional[Trigger], window_multiplier: int = 1
) -> None:
logger.info("Running a replan...")

# 1. Fetch the next workload and apply predictions.
Expand Down Expand Up @@ -339,4 +341,4 @@ async def _run_replan_impl(self, window_multiplier: int = 1) -> None:
"Metrics used during planning: %s", json.dumps(metrics._asdict(), indent=2)
)

await self._notify_new_blueprint(best_blueprint, best_blueprint_score)
await self._notify_new_blueprint(best_blueprint, best_blueprint_score, trigger)
8 changes: 5 additions & 3 deletions src/brad/planner/beam/table_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import json
import logging
from datetime import timedelta
from typing import Iterable, List, Tuple, Dict
from typing import Iterable, List, Tuple, Dict, Optional

from brad.config.engine import Engine, EngineBitmapValues
from brad.planner.abstract import BlueprintPlanner
Expand Down Expand Up @@ -49,7 +49,9 @@ def _check_if_metrics_warrant_replanning(self) -> bool:
# process.
return False

async def _run_replan_impl(self, window_multiplier: int = 1) -> None:
async def _run_replan_impl(
self, trigger: Optional[Trigger], window_multiplier: int = 1
) -> None:
logger.info("Running a replan...")

# 1. Fetch metrics and the next workload and then apply predictions.
Expand Down Expand Up @@ -325,7 +327,7 @@ async def _run_replan_impl(self, window_multiplier: int = 1) -> None:
"Metrics used during planning: %s", json.dumps(metrics._asdict(), indent=2)
)

await self._notify_new_blueprint(best_blueprint, best_blueprint_score)
await self._notify_new_blueprint(best_blueprint, best_blueprint_score, trigger)

def _preprocess_workload_queries(
self, workload: Workload
Expand Down
5 changes: 5 additions & 0 deletions src/brad/planner/beam/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from brad.planner.triggers.redshift_cpu_utilization import RedshiftCpuUtilization
from brad.planner.triggers.elapsed_time import ElapsedTimeTrigger
from brad.planner.triggers.query_latency_ceiling import QueryLatencyCeiling
from brad.planner.triggers.recent_change import RecentChange
from brad.planner.triggers.trigger import Trigger
from brad.planner.triggers.txn_latency_ceiling import TransactionLatencyCeiling
from brad.planner.triggers.variable_costs import VariableCosts
Expand Down Expand Up @@ -89,4 +90,8 @@ def get_beam_triggers(
)
)

recent_change = trigger_config["recent_change"]
if "disabled" not in recent_change:
trigger_list.append(RecentChange(planner_config, config.epoch_length))

return trigger_list
10 changes: 7 additions & 3 deletions src/brad/planner/neighborhood/neighborhood.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async def run_forever(self) -> None:
await asyncio.sleep(3)
logger.debug("Planner is checking if a replan is needed...")
if self._check_if_metrics_warrant_replanning():
await self.run_replan()
await self.run_replan(trigger=None)
finally:
if self._metrics_out is not None:
self._metrics_out.close()
Expand All @@ -88,7 +88,9 @@ def get_triggers(self) -> Iterable[Trigger]:
# TODO: Add triggers if needed.
return []

async def _run_replan_impl(self, window_multiplier: int = 1) -> None:
async def _run_replan_impl(
self, trigger: Optional[Trigger], window_multiplier: int = 1
) -> None:
# This will be long-running and will block the event loop. For our
# current needs, this is fine since the planner is the main component in
# the daemon process.
Expand Down Expand Up @@ -182,7 +184,9 @@ async def _run_replan_impl(self, window_multiplier: int = 1) -> None:
selected_score = Score()
self._last_suggested_blueprint = selected_blueprint
self._last_suggested_blueprint_score = selected_score
await self._notify_new_blueprint(selected_blueprint, selected_score)
await self._notify_new_blueprint(
selected_blueprint, selected_score, trigger
)

finally:
engines.close_sync()
Expand Down
15 changes: 13 additions & 2 deletions src/brad/planner/scoring/performance/unified_aurora.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,26 @@ def compute(
eps, overall_writer_cpu_util_denorm - pred_txn_cpu_denorm
)

total_analytics_load *= query_factor
total_analytics_cpu_denorm *= query_factor
# total_analytics_load *= query_factor
# total_analytics_cpu_denorm *= query_factor

# 4. Compute the workload-affected metrics.
# Basically, if there are no replicas, both the analytical and
# transactional load fall onto one instance (which we need to capture).
if next_aurora_has_replicas:
next_num_read_replicas = next_prov.num_nodes() - 1
assert next_num_read_replicas > 0

if no_analytics_queries_executed and len(base_query_run_times) > 0:
# We need to use a non-zero load. We use a constant factor to
# prime the system.
total_analytics_load = (
ctx.planner_config.aurora_initialize_load_fraction()
* aurora_num_cpus(ctx.current_blueprint.aurora_provisioning())
* ctx.current_blueprint.aurora_provisioning().num_nodes()
)
total_analytics_cpu_denorm = total_analytics_load

# Divide by the number of read replicas: we assume the load can
# be equally divided amongst the replicas.
analytics_affected_per_machine_load = (
Expand Down
Loading

0 comments on commit d7aa99f

Please sign in to comment.