Skip to content

Commit

Permalink
Internal improvements (#362)
Browse files Browse the repository at this point in the history
* Add constraint to ensure complete dataset is present together

* Account for index size with a multiplier

* Account for storage costs

* Add second half experiment
  • Loading branch information
geoffxy authored Nov 13, 2023
1 parent 4098bbc commit d01c2e0
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 31 deletions.
2 changes: 2 additions & 0 deletions config/planner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ athena_load_rate_mb_per_s: 10.0
s3_usd_per_mb_per_month: 0.000023
aurora_regular_usd_per_mb_per_month: 0.00010
aurora_io_opt_usd_per_mb_per_month: 0.000225
aurora_storage_index_multiplier: 3.0

###
### Dataset-specific Transition Constants
Expand Down Expand Up @@ -350,5 +351,6 @@ redshift_scaling:

use_io_optimized_aurora: true
use_recorded_routing_if_available: true
ensure_tables_together_on_one_engine: true

aurora_initialize_load_fraction: 0.25
14 changes: 14 additions & 0 deletions experiments/15-e2e-scenarios-v2/scale_up/COND
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@ run_experiment(
},
)

run_experiment(
name="brad_100g_2",
run="./run_workload_2.sh",
options={
# TODO: Ideally, configurations are shared. Only keep AWS secrets separate.
"config-file": "config/config_large_100.yml",
"planner-config-file": "config/planner.yml",
"schema-name": "imdb_extended_100g",
"ra-query-bank-file": IMDB_100GB_REGULAR_QUERY_BANK,
"num-front-ends": 28,
"dataset-type": "100gb",
},
)

run_command(
name="brad_100g_debug",
run="./run_workload_debug.sh",
Expand Down
4 changes: 0 additions & 4 deletions experiments/15-e2e-scenarios-v2/scale_up/run_workload.sh
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ sleep $((20 * 60)) # 20 mins total; 62 mins cumulative

log_workload_point "stopping_heavy_rana_8"
kill -INT $heavy_rana_pid
sleep 5
kill -KILL $heavy_rana_pid # To ensure we do not get stuck.
wait $heavy_rana_pid

log_workload_point "start_heavy_rana_10"
Expand All @@ -109,8 +107,6 @@ sleep $((10 * 60)) # 10 mins total; 72 mins cumulative

log_workload_point "stopping_heavy_rana_10"
kill -INT $heavy_rana_pid
sleep 5
kill -KILL $heavy_rana_pid # To ensure we do not get stuck.
wait $heavy_rana_pid

log_workload_point "start_heavy_rana_20"
Expand Down
105 changes: 105 additions & 0 deletions experiments/15-e2e-scenarios-v2/scale_up/run_workload_2.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#! /bin/bash

script_loc=$(cd $(dirname $0) && pwd -P)
cd $script_loc
source ../common.sh

# Scenario:
# - Start from 2x t4g.medium Aurora, Redshift off
# - 4 T clients, increasing to 24 T clients by 4 every minute
# We expect BRAD to scale up Aurora at this point, but not start Redshift (a replica should be fine for the analytical workload)
# - Increase the analytical load + add new "heavier" queries - expect that these go to Athena
# - Increase frequency of queries, expect Redshift to start (go straight to 2x dc2.large to avoid classic resize for practical purposes)

# TODO: This executor file should be adapted to run against the baselines too
# (TiDB / Serverless Redshift + Aurora)

initial_queries="99,56,32,92,91,49,30,83,94,38,87,86,76,37,31,46"
heavier_queries="58,61,62,64,69,73,74,51,57,60"

function step_txns() {
local lo=$1
local hi=$2
local gap_minute=$3
}

# Arguments:
# --config-file
# --planner-config-file
# --query-indexes
extract_named_arguments $@

function txn_sweep() {
local sweep=$1
local gap_minute=$2
local keep_last=$3

for t_clients in $sweep; do
start_txn_runner $t_clients # Implicit: --dataset-type
txn_pid=$runner_pid

sleep $(($gap_minute * 60))
if [[ -z $keep_last ]] || [[ $t_clients != $keep_last ]]; then
kill -INT $txn_pid
wait $txn_pid
fi
done
}

function inner_cancel_experiment() {
if [ ! -z $heavy_rana_pid ]; then
cancel_experiment $rana_pid $txn_pid $heavy_rana_pid
else
cancel_experiment $rana_pid $txn_pid
fi
}

trap "inner_cancel_experiment" INT
trap "inner_cancel_experiment" TERM

start_brad $config_file $planner_config_file
log_workload_point "brad_start_initiated"
sleep 30

# Start with 8 analytical clients.
log_workload_point "start_rana_8"
start_repeating_olap_runner 8 15 5 $initial_queries "ra_8"
rana_pid=$runner_pid
sleep 2

# Start with 8 transactional clients; hold for 10 minutes to stabilize.
log_workload_point "start_txn_8"
start_txn_runner 4
txn_pid=$runner_pid
sleep $((10 * 60))

# 20 minutes.
log_workload_point "start_heavy_rana_8"
start_repeating_olap_runner 8 15 1 $heavier_queries "ra_8_heavy" 8
heavy_rana_pid=$runner_pid
sleep $((20 * 60)) # 20 mins total; 30 mins cumulative

log_workload_point "stopping_heavy_rana_8"
kill -INT $heavy_rana_pid
wait $heavy_rana_pid

log_workload_point "start_heavy_rana_10"
start_repeating_olap_runner 10 5 1 $heavier_queries "ra_10_heavy" 8
heavy_rana_pid=$runner_pid
sleep $((10 * 60)) # 10 mins total; 40 mins cumulative

log_workload_point "stopping_heavy_rana_10"
kill -INT $heavy_rana_pid
wait $heavy_rana_pid

log_workload_point "start_heavy_rana_20"
start_repeating_olap_runner 20 5 1 $heavier_queries "ra_20_heavy" 8
heavy_rana_pid=$runner_pid
sleep $((30 * 60)) # 30 mins total; 70 mins cumulative

log_workload_point "experiment_workload_done"

# Shut down everything now.
>&2 echo "Experiment done. Shutting down runners..."
graceful_shutdown $rana_pid $heavy_rana_pid $txn_pid
log_workload_point "shutdown_complete"
3 changes: 3 additions & 0 deletions src/brad/config/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,6 @@ def flag(self, key: str, default: bool = False) -> bool:

def aurora_initialize_load_fraction(self) -> float:
return self._raw["aurora_initialize_load_fraction"]

def aurora_storage_index_multiplier(self) -> float:
return float(self._raw["aurora_storage_index_multiplier"])
11 changes: 11 additions & 0 deletions src/brad/planner/beam/query_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ async def _run_replan_impl(
self._providers.data_access_provider.apply_access_statistics(next_workload)
self._providers.data_access_provider.apply_access_statistics(current_workload)

if self._planner_config.flag("ensure_tables_together_on_one_engine"):
# This adds a constraint to ensure all tables are present together
# on at least one engine. This ensures that arbitrary unseen join
# templates can always be immediately handled.
all_tables = ", ".join(
[table.name for table in self._current_blueprint.tables()]
)
next_workload.add_priming_analytical_query(
f"SELECT 1 FROM {all_tables} LIMIT 1"
)

# If requested, we record this planning pass for later debugging.
if (
not self._disable_external_logging
Expand Down
35 changes: 22 additions & 13 deletions src/brad/planner/beam/query_based_candidate.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,20 @@ def add_query(
planner_config=ctx.planner_config,
)

# Table movement costs that this query imposes.
# Storage costs and table movement that this query imposes.
for name, next_placement in table_diffs:
# If we added a table to Athena or Aurora, we need to take into
# account its storage costs.
if (next_placement & EngineBitmapValues[Engine.Athena]) != 0:
# We added the table to Athena.
self.storage_cost += compute_single_athena_table_cost(name, ctx)

if (next_placement & EngineBitmapValues[Engine.Aurora]) != 0:
# Added table to Aurora.
# You only pay for 1 copy of the table on Aurora, regardless of
# how many read replicas you have.
self.storage_cost += compute_single_aurora_table_cost(name, ctx)

curr = ctx.current_blueprint.table_locations_bitmap()[name]
if ((~curr) & next_placement) == 0:
# This table was already present on the engine.
Expand All @@ -267,18 +279,6 @@ def add_query(
self.table_movement_trans_cost += result.movement_cost
self.table_movement_trans_time_s += result.movement_time_s

# If we added a table to Athena or Aurora, we need to take into
# account its storage costs.
if (((~curr) & next_placement) & (EngineBitmapValues[Engine.Athena])) != 0:
# We added the table to Athena.
self.storage_cost += compute_single_athena_table_cost(name, ctx)

if (((~curr) & next_placement) & (EngineBitmapValues[Engine.Aurora])) != 0:
# Added table to Aurora.
# You only pay for 1 copy of the table on Aurora, regardless of
# how many read replicas you have.
self.storage_cost += compute_single_aurora_table_cost(name, ctx)

# Adding a new query can affect the feasibility of the provisioning.
self.feasibility = BlueprintFeasibility.Unchecked
self.explored_provisionings = False
Expand Down Expand Up @@ -365,6 +365,7 @@ def add_query_last_step(

def add_transactional_tables(self, ctx: ScoringContext) -> None:
referenced_tables = set()
newly_added = set()

# Make sure that tables referenced in transactions are present on
# Aurora.
Expand All @@ -373,9 +374,17 @@ def add_transactional_tables(self, ctx: ScoringContext) -> None:
if tbl not in self.table_placements:
# This is a CTE.
continue
orig = self.table_placements[tbl]
self.table_placements[tbl] |= EngineBitmapValues[Engine.Aurora]
referenced_tables.add(tbl)

if ((~orig) & self.table_placements[tbl]) != 0:
newly_added.add(tbl)

# Account for storage costs (Aurora only charges for 1 copy).
for tbl in newly_added:
self.storage_cost += compute_single_aurora_table_cost(tbl, ctx)

# Update the table movement score if needed.
for tbl in referenced_tables:
cur = ctx.current_blueprint.table_locations_bitmap()[tbl]
Expand Down
11 changes: 11 additions & 0 deletions src/brad/planner/beam/table_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ async def _run_replan_impl(
self._providers.data_access_provider.apply_access_statistics(next_workload)
self._providers.data_access_provider.apply_access_statistics(current_workload)

if self._planner_config.flag("ensure_tables_together_on_one_engine"):
# This adds a constraint to ensure all tables are present together
# on at least one engine. This ensures that arbitrary unseen join
# templates can always be immediately handled.
all_tables = ", ".join(
[table.name for table in self._current_blueprint.tables()]
)
next_workload.add_priming_analytical_query(
f"SELECT 1 FROM {all_tables} LIMIT 1"
)

if (
not self._disable_external_logging
and BlueprintPickleDebugLogger.is_log_requested(self._config)
Expand Down
33 changes: 21 additions & 12 deletions src/brad/planner/beam/table_based_candidate.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,18 @@ def add_placement(
changed_tables.append(table_name)
changed = True

# If we added the table to Athena or Aurora, we need to take into
# account its storage costs.
if (((~cur) & nxt) & (EngineBitmapValues[Engine.Athena])) != 0:
# We added the table to Athena.
self.storage_cost += compute_single_athena_table_cost(table_name, ctx)

if (((~cur) & nxt) & (EngineBitmapValues[Engine.Aurora])) != 0:
# Added table to Aurora.
# You only pay for 1 copy of the table on Aurora, regardless of
# how many read replicas you have.
self.storage_cost += compute_single_aurora_table_cost(table_name, ctx)

# Update movement scoring.
self._update_movement_score(changed_tables, ctx)

Expand Down Expand Up @@ -295,6 +307,7 @@ async def _route_queries_compute_scan_stats(

def add_transactional_tables(self, ctx: ScoringContext) -> None:
referenced_tables = set()
newly_added = set()

# Make sure that tables referenced in transactions are present on
# Aurora.
Expand All @@ -303,9 +316,17 @@ def add_transactional_tables(self, ctx: ScoringContext) -> None:
if tbl not in self.table_placements:
# This is a CTE.
continue
orig = self.table_placements[tbl]
self.table_placements[tbl] |= EngineBitmapValues[Engine.Aurora]
referenced_tables.add(tbl)

if ((~orig) & self.table_placements[tbl]) != 0:
newly_added.add(tbl)

for tbl in newly_added:
# Aurora only charges for 1 copy of the data.
self.storage_cost += compute_single_aurora_table_cost(tbl, ctx)

# If we made a change to the table placement, see if it corresponds to a
# table score change.
self._update_movement_score(referenced_tables, ctx)
Expand All @@ -323,18 +344,6 @@ def _update_movement_score(
self.table_movement_trans_cost += result.movement_cost
self.table_movement_trans_time_s += result.movement_time_s

# If we added the table to Athena or Aurora, we need to take into
# account its storage costs.
if (((~cur) & nxt) & (EngineBitmapValues[Engine.Athena])) != 0:
# We added the table to Athena.
self.storage_cost += compute_single_athena_table_cost(tbl, ctx)

if (((~cur) & nxt) & (EngineBitmapValues[Engine.Aurora])) != 0:
# Added table to Aurora.
# You only pay for 1 copy of the table on Aurora, regardless of
# how many read replicas you have.
self.storage_cost += compute_single_aurora_table_cost(tbl, ctx)

def try_to_make_feasible_if_needed(self, ctx: ScoringContext) -> None:
"""
Checks if this blueprint is already feasible, and if so, does nothing
Expand Down
8 changes: 6 additions & 2 deletions src/brad/planner/scoring/table_placement.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,15 @@ def compute_single_aurora_table_cost(table_name: str, ctx: ScoringContext) -> fl

if ctx.planner_config.use_io_optimized_aurora():
storage_usd_per_mb_per_month += (
raw_extract_mb * ctx.planner_config.aurora_io_opt_usd_per_mb_per_month()
raw_extract_mb
* ctx.planner_config.aurora_io_opt_usd_per_mb_per_month()
* ctx.planner_config.aurora_storage_index_multiplier()
)
else:
storage_usd_per_mb_per_month += (
raw_extract_mb * ctx.planner_config.aurora_regular_usd_per_mb_per_month()
raw_extract_mb
* ctx.planner_config.aurora_regular_usd_per_mb_per_month()
* ctx.planner_config.aurora_storage_index_multiplier()
)

source_period = timedelta(days=30)
Expand Down
28 changes: 28 additions & 0 deletions src/brad/planner/workload/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,33 @@ def __init__(
self._table_sizes_mb: Dict[Tuple[str, Engine], int] = {}
self._dataset_size_mb = 0

def add_priming_analytical_query(self, query_str: str) -> None:
"""
Used to add queries to the workload that should be used during planning
as "constraints". This should be called after the workload/statistics
providers.
"""
query = Query(query_str, arrival_count=0)
self._analytical_queries.append(query)

if self._predicted_analytical_latencies is not None:
self._predicted_analytical_latencies = np.append(
self._predicted_analytical_latencies, np.zeros((1, 3)), axis=0
)
if self._predicted_aurora_pages_accessed is not None:
self._predicted_aurora_pages_accessed = np.append(
self._predicted_aurora_pages_accessed, np.zeros((1,)), axis=0
)
if self._predicted_athena_bytes_accessed is not None:
self._predicted_athena_bytes_accessed = np.append(
self._predicted_athena_bytes_accessed, np.zeros((1,)), axis=0
)
self._query_index_mapping.append(-1)

self._analytical_query_arrival_counts = np.append(
self._analytical_query_arrival_counts, np.zeros((1,)), axis=0
)

def clone(self) -> "Workload":
workload = Workload(
self._period,
Expand Down Expand Up @@ -218,6 +245,7 @@ def compute_latency_gains(self) -> npt.NDArray:
ratios.append(preds[:, j] / preds[:, i])
combined = np.stack(ratios, axis=1)
gains = np.amax(combined, axis=1)
gains[~np.isfinite(gains)] = 0.0
return gains

###
Expand Down

0 comments on commit d01c2e0

Please sign in to comment.