Skip to content

Commit

Permalink
Consider functionality constraints during planning
Browse files Browse the repository at this point in the history
  • Loading branch information
geoffxy committed Nov 13, 2023
1 parent 17f6665 commit 54e1e45
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 11 deletions.
21 changes: 12 additions & 9 deletions src/brad/planner/beam/query_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,24 +137,21 @@ async def _run_replan_impl(
ctx.compute_engine_latency_norm_factor()

beam_size = self._planner_config.beam_size()
engines = [Engine.Aurora, Engine.Redshift, Engine.Athena]
first_query_idx = query_indices[0]
first_query = analytical_queries[first_query_idx]
current_top_k: List[BlueprintCandidate] = []

# Not a fundamental limitation, but it simplifies the implementation
# below if this condition is true.
assert beam_size >= len(engines)

# 4. Initialize the top-k set (beam).
for routing_engine in engines:
for routing_engine in Engine.from_bitmap(
planning_router.run_functionality_routing(first_query)
):
candidate = BlueprintCandidate.based_on(
self._current_blueprint, self._comparator
)
candidate.add_transactional_tables(ctx)
query = analytical_queries[first_query_idx]
candidate.add_query(
first_query_idx,
query,
first_query,
routing_engine,
next_workload.get_predicted_analytical_latency(
first_query_idx, routing_engine
Expand Down Expand Up @@ -187,10 +184,16 @@ async def _run_replan_impl(
next_top_k: List[BlueprintCandidate] = []
query = analytical_queries[query_idx]

# Only a subset of the engines may support this query if it uses
# "special functionality".
engine_candidates = Engine.from_bitmap(
planning_router.run_functionality_routing(query)
)

# For each candidate in the current top k, expand it by one
# query in the workload.
for curr_candidate in current_top_k:
for routing_engine in engines:
for routing_engine in engine_candidates:
next_candidate = curr_candidate.clone()
next_candidate.add_query(
query_idx,
Expand Down
1 change: 1 addition & 0 deletions src/brad/planner/beam/table_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ async def _run_replan_impl(
)

# 2. Cluster queries by tables and sort by gains (sum).
# TODO: Need to consider functionality when creating clusters.
clusters = self._preprocess_workload_queries(next_workload)

# Sanity check. We cannot run planning without at least one query in the
Expand Down
4 changes: 2 additions & 2 deletions src/brad/routing/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ async def engine_for(
place_support = self._run_location_routing(query, self._table_placement_bitmap)

# Engine functionality constraints.
func_support = self._run_functionality_routing(query)
func_support = self.run_functionality_routing(query)

# Get supported engines.
valid_locations = place_support & func_support
Expand Down Expand Up @@ -145,7 +145,7 @@ def engine_for_sync(
# Ideally we re-implement a sync version.
return asyncio.run(self.engine_for(query, session))

def _run_functionality_routing(self, query: QueryRep) -> int:
def run_functionality_routing(self, query: QueryRep) -> int:
"""
Based on the functinalities required by the query (e.g. geospatial),
compute the set of engines that are able to serve this query.
Expand Down

0 comments on commit 54e1e45

Please sign in to comment.