diff --git a/config/planner.yml b/config/planner.yml index e8922e18..f1fcd9c7 100644 --- a/config/planner.yml +++ b/config/planner.yml @@ -52,6 +52,8 @@ triggers: ceiling_s: 0.030 sustained_epochs: 3 + recent_change: {} + ### ### Beam planning constants. ### @@ -348,3 +350,5 @@ redshift_scaling: use_io_optimized_aurora: true use_recorded_routing_if_available: true + +aurora_initialize_load_fraction: 0.25 diff --git a/experiments/15-e2e-scenarios-v2/scale_down/run_workload.sh b/experiments/15-e2e-scenarios-v2/scale_down/run_workload.sh index d0008c4a..2af2e783 100755 --- a/experiments/15-e2e-scenarios-v2/scale_down/run_workload.sh +++ b/experiments/15-e2e-scenarios-v2/scale_down/run_workload.sh @@ -20,10 +20,10 @@ log_workload_point "brad_start_initiated" sleep 30 log_workload_point "clients_starting" -start_repeating_olap_runner 8 15 5 $ra_query_indexes "ra_8" +start_repeating_olap_runner 6 15 5 $ra_query_indexes "ra_8" rana_pid=$runner_pid -start_txn_runner 4 +start_txn_runner 3 txn_pid=$runner_pid start_repeating_olap_runner 1 70 5 "61,71,75" "ra_1_special" @@ -46,7 +46,7 @@ trap "inner_cancel_experiment" TERM total_second_phase_time_s="$((60 * 60))" wait_start="$(date -u +%s)" -poll_file_for_event $COND_OUT/brad_daemon_events.csv "post_transition_completed" 45 +poll_file_for_event $COND_OUT/brad_daemon_events.csv "post_transition_completed" 30 log_workload_point "after_replan" wait_end="$(date -u +%s)" diff --git a/experiments/15-e2e-scenarios-v2/scale_down/run_workload_debug.sh b/experiments/15-e2e-scenarios-v2/scale_down/run_workload_debug.sh index 57b41073..cc754f35 100755 --- a/experiments/15-e2e-scenarios-v2/scale_down/run_workload_debug.sh +++ b/experiments/15-e2e-scenarios-v2/scale_down/run_workload_debug.sh @@ -23,10 +23,10 @@ export BRAD_INITIAL_ROUTE_REDSHIFT_ONLY=1 start_brad_debug $config_file $planner_config_file sleep 10 -start_repeating_olap_runner 8 15 5 $ra_query_indexes "ra_8" +start_repeating_olap_runner 6 15 5 $ra_query_indexes "ra_8" rana_pid=$runner_pid -start_txn_runner 4 +start_txn_runner 3 txn_pid=$runner_pid start_repeating_olap_runner 1 70 5 "61,71,75" "ra_1_special" diff --git a/src/brad/admin/run_planner.py b/src/brad/admin/run_planner.py index 857e75ba..4f1fdd85 100644 --- a/src/brad/admin/run_planner.py +++ b/src/brad/admin/run_planner.py @@ -2,7 +2,7 @@ import logging import pathlib import pytz -from typing import Dict +from typing import Dict, Optional from datetime import timedelta, datetime from brad.asset_manager import AssetManager @@ -21,6 +21,7 @@ from brad.planner.scoring.performance.precomputed_predictions import ( PrecomputedPredictions, ) +from brad.planner.triggers.trigger import Trigger from brad.planner.metrics import ( MetricsFromMonitor, FixedMetricsProvider, @@ -248,7 +249,9 @@ async def run_planner_impl(args) -> None: ) asyncio.run(monitor.fetch_latest()) - async def on_new_blueprint(blueprint: Blueprint, score: Score): + async def on_new_blueprint( + blueprint: Blueprint, score: Score, _trigger: Optional[Trigger] + ): logger.info("Selected new blueprint") logger.info("%s", blueprint) @@ -276,7 +279,7 @@ async def on_new_blueprint(blueprint: Blueprint, score: Score): event_loop = asyncio.new_event_loop() event_loop.set_debug(enabled=args.debug) asyncio.set_event_loop(event_loop) - asyncio.run(planner.run_replan()) + asyncio.run(planner.run_replan(trigger=None)) if args.save_pickle: workload.serialize_for_debugging( diff --git a/src/brad/config/planner.py b/src/brad/config/planner.py index 111b7195..dea4a06e 100644 --- a/src/brad/config/planner.py +++ b/src/brad/config/planner.py @@ -230,3 +230,6 @@ def flag(self, key: str, default: bool = False) -> bool: return default else: return self._raw[key] + + def aurora_initialize_load_fraction(self) -> float: + return self._raw["aurora_initialize_load_fraction"] diff --git a/src/brad/daemon/daemon.py b/src/brad/daemon/daemon.py index 69401492..3fc14bbd 100644 --- a/src/brad/daemon/daemon.py +++ b/src/brad/daemon/daemon.py @@ -47,6 +47,8 @@ from brad.planner.scoring.performance.precomputed_predictions import ( PrecomputedPredictions, ) +from brad.planner.triggers.trigger import Trigger +from brad.planner.triggers.recent_change import RecentChange from brad.planner.workload import Workload from brad.planner.workload.builder import WorkloadBuilder from brad.planner.workload.provider import LoggedWorkloadProvider @@ -339,7 +341,9 @@ async def _read_front_end_messages(self, front_end: "_FrontEndProcess") -> None: str(message), ) - async def _handle_new_blueprint(self, blueprint: Blueprint, score: Score) -> None: + async def _handle_new_blueprint( + self, blueprint: Blueprint, score: Score, trigger: Optional[Trigger] + ) -> None: """ Informs the server about a new blueprint. """ @@ -351,7 +355,7 @@ async def _handle_new_blueprint(self, blueprint: Blueprint, score: Score) -> Non if self._system_event_logger is not None: self._system_event_logger.log(SystemEvent.NewBlueprintProposed) - if self._should_skip_blueprint(blueprint, score): + if self._should_skip_blueprint(blueprint, score, trigger): if self._system_event_logger is not None: self._system_event_logger.log(SystemEvent.NewBlueprintSkipped) return @@ -383,7 +387,9 @@ async def _handle_new_blueprint(self, blueprint: Blueprint, score: Score) -> Non ) self._transition_task = asyncio.create_task(self._run_transition_part_one()) - def _should_skip_blueprint(self, blueprint: Blueprint, score: Score) -> bool: + def _should_skip_blueprint( + self, blueprint: Blueprint, score: Score, trigger: Optional[Trigger] + ) -> bool: """ This is called whenever the planner chooses a new blueprint. The purpose is to avoid transitioning to blueprints with few changes. @@ -407,15 +413,34 @@ def _should_skip_blueprint(self, blueprint: Blueprint, score: Score) -> bool: logger.info("Planner selected an identical blueprint - skipping.") return True - if diff.aurora_diff() is not None or diff.redshift_diff() is not None: - return False - current_score = self._blueprint_mgr.get_active_score() if current_score is None: # Do not skip - we are currently missing the score of the active # blueprint, so there is nothing to compare to. return False + if trigger is not None and isinstance(trigger, RecentChange): + # TODO(geoffxy): This should eventually be removed. The right + # solution is doing a comparison during blueprint planning. Added + # temporarily on November 2, 2023. + current_cost = ( + current_score.provisioning_cost + + current_score.workload_scan_cost + + current_score.storage_cost + ) + next_cost = ( + score.provisioning_cost + score.workload_scan_cost + score.storage_cost + ) + + if next_cost > current_cost: + logger.info( + "Skipping a RecentChange triggered blueprint because a higher-cost blueprint was selected." + ) + return True + + if diff.aurora_diff() is not None or diff.redshift_diff() is not None: + return False + current_dist = current_score.normalized_query_count_distribution() next_dist = score.normalized_query_count_distribution() abs_delta = np.abs(next_dist - current_dist).sum() @@ -534,7 +559,9 @@ async def _handle_internal_command(self, command: str) -> RowList: if self._system_event_logger is not None: self._system_event_logger.log(SystemEvent.ManuallyTriggeredReplan) try: - await self._planner.run_replan(window_multiplier) + await self._planner.run_replan( + trigger=None, window_multiplier=window_multiplier + ) return [("Planner completed. See the daemon's logs for more details.",)] except Exception as ex: logger.exception("Encountered exception when running the planner.") @@ -550,9 +577,16 @@ async def _run_transition_part_one(self) -> None: transitioning_to_version = tm.next_version if self._system_event_logger is not None: + next_blueprint = tm.next_blueprint + assert next_blueprint is not None + next_aurora = str(next_blueprint.aurora_provisioning()) + next_redshift = str(next_blueprint.redshift_provisioning()) + self._system_event_logger.log( SystemEvent.PreTransitionStarted, - "version={}".format(transitioning_to_version), + f"version={transitioning_to_version}," + f"aurora={next_aurora}," + f"redshift={next_redshift}", ) def update_monitor_sources(): diff --git a/src/brad/planner/abstract.py b/src/brad/planner/abstract.py index c6c799f2..4f9e55ec 100644 --- a/src/brad/planner/abstract.py +++ b/src/brad/planner/abstract.py @@ -19,7 +19,9 @@ logger = logging.getLogger(__name__) -NewBlueprintCallback = Callable[[Blueprint, Score], Coroutine[None, None, None]] +NewBlueprintCallback = Callable[ + [Blueprint, Score, Optional[Trigger]], Coroutine[None, None, None] +] class BlueprintPlanner: @@ -98,7 +100,7 @@ async def run_forever(self) -> None: SystemEvent.TriggeredReplan, "trigger={}".format(t.name()), ) - await self.run_replan() + await self.run_replan(trigger=t) break else: logger.debug( @@ -106,7 +108,9 @@ async def run_forever(self) -> None: ) await asyncio.sleep(check_period) - async def run_replan(self, window_multiplier: int = 1) -> None: + async def run_replan( + self, trigger: Optional[Trigger], window_multiplier: int = 1 + ) -> None: """ Triggers a "forced" replan. Used for debugging. @@ -114,11 +118,15 @@ async def run_replan(self, window_multiplier: int = 1) -> None: """ try: self._replan_in_progress = True - await self._run_replan_impl(window_multiplier) + for t in self.get_triggers(): + t.on_replan(trigger) + await self._run_replan_impl(trigger, window_multiplier) finally: self._replan_in_progress = False - async def _run_replan_impl(self, window_multiplier: int = 1) -> None: + async def _run_replan_impl( + self, trigger: Optional[Trigger], window_multiplier: int = 1 + ) -> None: """ Implementers should override this method to define the blueprint optimization algorithm. @@ -166,12 +174,14 @@ def register_new_blueprint_callback(self, callback: NewBlueprintCallback) -> Non """ self._callbacks.append(callback) - async def _notify_new_blueprint(self, blueprint: Blueprint, score: Score) -> None: + async def _notify_new_blueprint( + self, blueprint: Blueprint, score: Score, trigger: Optional[Trigger] + ) -> None: """ Concrete planners should call this method to notify subscribers about the next blueprint. """ tasks = [] for callback in self._callbacks: - tasks.append(asyncio.create_task(callback(blueprint, score))) + tasks.append(asyncio.create_task(callback(blueprint, score, trigger))) await asyncio.gather(*tasks) diff --git a/src/brad/planner/beam/query_based.py b/src/brad/planner/beam/query_based.py index 51af6345..30fbb581 100644 --- a/src/brad/planner/beam/query_based.py +++ b/src/brad/planner/beam/query_based.py @@ -3,7 +3,7 @@ import json import logging from datetime import timedelta -from typing import Iterable, List +from typing import Iterable, List, Optional from brad.config.engine import Engine, EngineBitmapValues from brad.planner.abstract import BlueprintPlanner @@ -43,7 +43,9 @@ def __init__(self, *args, **kwargs) -> None: def get_triggers(self) -> Iterable[Trigger]: return self._triggers - async def _run_replan_impl(self, window_multiplier: int = 1) -> None: + async def _run_replan_impl( + self, trigger: Optional[Trigger], window_multiplier: int = 1 + ) -> None: logger.info("Running a replan...") # 1. Fetch the next workload and apply predictions. @@ -339,4 +341,4 @@ async def _run_replan_impl(self, window_multiplier: int = 1) -> None: "Metrics used during planning: %s", json.dumps(metrics._asdict(), indent=2) ) - await self._notify_new_blueprint(best_blueprint, best_blueprint_score) + await self._notify_new_blueprint(best_blueprint, best_blueprint_score, trigger) diff --git a/src/brad/planner/beam/table_based.py b/src/brad/planner/beam/table_based.py index 09bb5324..05eae9ac 100644 --- a/src/brad/planner/beam/table_based.py +++ b/src/brad/planner/beam/table_based.py @@ -4,7 +4,7 @@ import json import logging from datetime import timedelta -from typing import Iterable, List, Tuple, Dict +from typing import Iterable, List, Tuple, Dict, Optional from brad.config.engine import Engine, EngineBitmapValues from brad.planner.abstract import BlueprintPlanner @@ -49,7 +49,9 @@ def _check_if_metrics_warrant_replanning(self) -> bool: # process. return False - async def _run_replan_impl(self, window_multiplier: int = 1) -> None: + async def _run_replan_impl( + self, trigger: Optional[Trigger], window_multiplier: int = 1 + ) -> None: logger.info("Running a replan...") # 1. Fetch metrics and the next workload and then apply predictions. @@ -325,7 +327,7 @@ async def _run_replan_impl(self, window_multiplier: int = 1) -> None: "Metrics used during planning: %s", json.dumps(metrics._asdict(), indent=2) ) - await self._notify_new_blueprint(best_blueprint, best_blueprint_score) + await self._notify_new_blueprint(best_blueprint, best_blueprint_score, trigger) def _preprocess_workload_queries( self, workload: Workload diff --git a/src/brad/planner/beam/triggers.py b/src/brad/planner/beam/triggers.py index 078b8c0c..b50dd3ed 100644 --- a/src/brad/planner/beam/triggers.py +++ b/src/brad/planner/beam/triggers.py @@ -9,6 +9,7 @@ from brad.planner.triggers.redshift_cpu_utilization import RedshiftCpuUtilization from brad.planner.triggers.elapsed_time import ElapsedTimeTrigger from brad.planner.triggers.query_latency_ceiling import QueryLatencyCeiling +from brad.planner.triggers.recent_change import RecentChange from brad.planner.triggers.trigger import Trigger from brad.planner.triggers.txn_latency_ceiling import TransactionLatencyCeiling from brad.planner.triggers.variable_costs import VariableCosts @@ -89,4 +90,8 @@ def get_beam_triggers( ) ) + recent_change = trigger_config["recent_change"] + if "disabled" not in recent_change: + trigger_list.append(RecentChange(planner_config, config.epoch_length)) + return trigger_list diff --git a/src/brad/planner/neighborhood/neighborhood.py b/src/brad/planner/neighborhood/neighborhood.py index dac78b38..2489c36e 100644 --- a/src/brad/planner/neighborhood/neighborhood.py +++ b/src/brad/planner/neighborhood/neighborhood.py @@ -79,7 +79,7 @@ async def run_forever(self) -> None: await asyncio.sleep(3) logger.debug("Planner is checking if a replan is needed...") if self._check_if_metrics_warrant_replanning(): - await self.run_replan() + await self.run_replan(trigger=None) finally: if self._metrics_out is not None: self._metrics_out.close() @@ -88,7 +88,9 @@ def get_triggers(self) -> Iterable[Trigger]: # TODO: Add triggers if needed. return [] - async def _run_replan_impl(self, window_multiplier: int = 1) -> None: + async def _run_replan_impl( + self, trigger: Optional[Trigger], window_multiplier: int = 1 + ) -> None: # This will be long-running and will block the event loop. For our # current needs, this is fine since the planner is the main component in # the daemon process. @@ -182,7 +184,9 @@ async def _run_replan_impl(self, window_multiplier: int = 1) -> None: selected_score = Score() self._last_suggested_blueprint = selected_blueprint self._last_suggested_blueprint_score = selected_score - await self._notify_new_blueprint(selected_blueprint, selected_score) + await self._notify_new_blueprint( + selected_blueprint, selected_score, trigger + ) finally: engines.close_sync() diff --git a/src/brad/planner/scoring/performance/unified_aurora.py b/src/brad/planner/scoring/performance/unified_aurora.py index 18315e89..c6b30cd2 100644 --- a/src/brad/planner/scoring/performance/unified_aurora.py +++ b/src/brad/planner/scoring/performance/unified_aurora.py @@ -165,8 +165,8 @@ def compute( eps, overall_writer_cpu_util_denorm - pred_txn_cpu_denorm ) - total_analytics_load *= query_factor - total_analytics_cpu_denorm *= query_factor + # total_analytics_load *= query_factor + # total_analytics_cpu_denorm *= query_factor # 4. Compute the workload-affected metrics. # Basically, if there are no replicas, both the analytical and @@ -174,6 +174,17 @@ def compute( if next_aurora_has_replicas: next_num_read_replicas = next_prov.num_nodes() - 1 assert next_num_read_replicas > 0 + + if no_analytics_queries_executed and len(base_query_run_times) > 0: + # We need to use a non-zero load. We use a constant factor to + # prime the system. + total_analytics_load = ( + ctx.planner_config.aurora_initialize_load_fraction() + * aurora_num_cpus(ctx.current_blueprint.aurora_provisioning()) + * ctx.current_blueprint.aurora_provisioning().num_nodes() + ) + total_analytics_cpu_denorm = total_analytics_load + # Divide by the number of read replicas: we assume the load can # be equally divided amongst the replicas. analytics_affected_per_machine_load = ( diff --git a/src/brad/planner/scoring/provisioning.py b/src/brad/planner/scoring/provisioning.py index d003298c..925120dc 100644 --- a/src/brad/planner/scoring/provisioning.py +++ b/src/brad/planner/scoring/provisioning.py @@ -115,7 +115,7 @@ def compute_aurora_transition_time_s( return 0.0 # We transition one instance at a time to minimize disruption. - num_nodes_to_create = new.num_nodes() - old.num_nodes() + num_nodes_to_create = max(new.num_nodes() - old.num_nodes(), 0) if new.instance_type() != old.instance_type(): # We modify "overlapping" nodes. For the primary instance, we actually diff --git a/src/brad/planner/triggers/recent_change.py b/src/brad/planner/triggers/recent_change.py new file mode 100644 index 00000000..a45eb140 --- /dev/null +++ b/src/brad/planner/triggers/recent_change.py @@ -0,0 +1,69 @@ +import logging +import pytz +from datetime import timedelta, datetime +from typing import Optional + +from brad.config.planner import PlannerConfig +from brad.blueprint import Blueprint +from brad.blueprint.diff.blueprint import BlueprintDiff +from brad.planner.scoring.score import Score + +from .trigger import Trigger + +logger = logging.getLogger(__name__) + + +class RecentChange(Trigger): + """ + This triggers a replan if there was a recent provisioning change. + """ + + def __init__(self, planner_config: PlannerConfig, epoch_length: timedelta) -> None: + super().__init__(epoch_length) + self._planner_config = planner_config + self._is_first_change = True + self._last_provisioning_change: Optional[datetime] = None + + async def should_replan(self) -> bool: + if self._last_provisioning_change is None: + return False + + window = self._planner_config.planning_window() + now = datetime.now(tz=pytz.utc) + + if now >= self._last_provisioning_change + window: + self._last_provisioning_change = None + logger.info("Triggering replan because of a recent provisioning change.") + return True + else: + return False + + def on_replan(self, trigger: Optional["Trigger"]) -> None: + logger.debug( + "Clearing RecentChange trigger replan state due to %s firing", + trigger.name() if trigger is not None else "manual", + ) + self._last_provisioning_change = None + + def update_blueprint(self, blueprint: Blueprint, score: Optional[Score]) -> None: + if self._is_first_change or self._current_blueprint is None: + self._is_first_change = False + super().update_blueprint(blueprint, score) + return + + prev_blueprint = self._current_blueprint + super().update_blueprint(blueprint, score) + + diff = BlueprintDiff.of(prev_blueprint, blueprint) + if diff is None: + self._last_provisioning_change = None + return + + aurora_diff = diff.aurora_diff() + redshift_diff = diff.redshift_diff() + if aurora_diff is not None or redshift_diff is not None: + self._last_provisioning_change = datetime.now(tz=pytz.utc) + logger.info( + "RecentChangeTrigger: Will trigger one planning window after %s", + self._last_provisioning_change.strftime("%Y-%m-%d_%H-%M-%S"), + ) diff --git a/src/brad/planner/triggers/trigger.py b/src/brad/planner/triggers/trigger.py index 8b5558b0..d00ad500 100644 --- a/src/brad/planner/triggers/trigger.py +++ b/src/brad/planner/triggers/trigger.py @@ -32,6 +32,13 @@ def name(self) -> str: """ return self.__class__.__name__ + def on_replan(self, trigger: Optional["Trigger"]) -> None: + """ + Called when a replan occurs (for stateful triggers). The trigger that + fired the replan will be passed in; it will be None if the replan was + triggered manually. + """ + def _reset_cutoff(self) -> None: self._cutoff = datetime.now(tz=pytz.utc)