-
Notifications
You must be signed in to change notification settings - Fork 227
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(query): pushdown joins only if lhs/rhs shard keys are identical #1513
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -365,6 +365,24 @@ class SingleClusterPlanner(val dataset: Dataset, | |
} | ||
// scalastyle:on cyclomatic.complexity | ||
|
||
/** | ||
* Return the target-schema labels (if any) that will be used to materialize a RawSeries. | ||
*/ | ||
private def getTargetSchemaLabelsFromRawSeries(rs: RawSeries, qContext: QueryContext): Option[Seq[String]] = { | ||
val tsp = targetSchemaProvider(qContext) | ||
val filters = getColumnFilterGroup(rs).map(_.toSeq) | ||
// Get time params from the RangeSelector, and use them to identify a TargetSchemaChanges. | ||
val startEndMsPairOpt = rs.rangeSelector match { | ||
case is: IntervalSelector => Some((is.from, is.to)) | ||
case _ => None | ||
} | ||
val tsFunc = tsp.targetSchemaFunc(filters.head) | ||
startEndMsPairOpt match { | ||
case Some((startMs, endMs)) => findTargetSchema(tsFunc, startMs, endMs).map(_.schema) | ||
case _ => None | ||
} | ||
} | ||
|
||
// scalastyle:off method.length | ||
/** | ||
* Returns the shards spanned by a LogicalPlan's data. | ||
|
@@ -387,18 +405,11 @@ class SingleClusterPlanner(val dataset: Dataset, | |
assert(filters.size == 1, s"expected leaf plan to yield single filter group, but got ${filters.size}") | ||
leafPlan match { | ||
case rs: RawSeries => | ||
// Get time params from the RangeSelector, and use them to identify a TargetSchemaChanges. | ||
val startEndMsPairOpt = rs.rangeSelector match { | ||
case is: IntervalSelector => Some((is.from, is.to)) | ||
case _ => None | ||
} | ||
val tsLabels: Option[Seq[String]] = { | ||
val tsFunc = tsp.targetSchemaFunc(filters.head) | ||
startEndMsPairOpt match { | ||
case Some((startMs, endMs)) => findTargetSchema(tsFunc, startMs, endMs).map(_.schema) | ||
case _ => None | ||
} | ||
} | ||
val tsLabels: Option[Seq[String]] = getTargetSchemaLabelsFromRawSeries(rs, qContext) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why a method? This lead to code duplication and doing |
||
val (startMs, endMs): (Long, Long) = startEndMsPairOpt.getOrElse((0, Long.MaxValue)) | ||
leafInfos.append(LeafInfo(leafPlan, startMs, endMs, renameMetricFilter(filters.head), tsLabels)) | ||
// Do nothing; not pulling data from any shards. | ||
|
@@ -435,7 +446,10 @@ class SingleClusterPlanner(val dataset: Dataset, | |
*/ | ||
private def getPushdownShards(qContext: QueryContext, | ||
lp: LogicalPlan): Option[Set[Int]] = { | ||
def helper(lp: LogicalPlan): Option[Set[Int]] = lp match { | ||
case class PushdownData(shards: Set[Int], | ||
shardKeys: Map[String, String], | ||
tschemaLabels: Set[String]) | ||
def helper(lp: LogicalPlan): Option[PushdownData] = lp match { | ||
// VectorPlans can't currently be pushed down. Consider: | ||
// foo{...} or vector(0) | ||
// If foo{...} is sharded with spread=1, but both shards contain no foo{...} data, | ||
|
@@ -461,49 +475,61 @@ class SingleClusterPlanner(val dataset: Dataset, | |
case aif: ApplyInstantFunction => helper(aif.vectors) | ||
case bj: BinaryJoin => | ||
// lhs/rhs must reside on the same set of shards, and target schema labels for all leaves must be | ||
// discoverable, equal, and preserved by join keys | ||
val lhsShards = helper(bj.lhs) | ||
val rhsShards = helper(bj.rhs) | ||
val canPushdown = lhsShards != None && rhsShards != None && | ||
// either the shard groups are equal, or either of lhs/rhs includes only scalars. | ||
(lhsShards == rhsShards || lhsShards.get.isEmpty || rhsShards.get.isEmpty) && | ||
{ | ||
val targetSchemaLabels = | ||
getUniversalTargetSchemaLabels(bj, targetSchemaProvider(qContext)) | ||
targetSchemaLabels.isDefined && | ||
targetSchemaLabels.get.toSet.diff(shardColumns.toSet).subsetOf(bj.on.toSet) | ||
} | ||
// union lhs/rhs shards, since one might be empty (if it's a scalar) | ||
if (canPushdown) Some(lhsShards.get.union(rhsShards.get)) else None | ||
// discoverable, equal, and preserved by join keys. | ||
val lhsData = helper(bj.lhs) | ||
val rhsData = helper(bj.rhs) | ||
val canPushdown = lhsData.isDefined && rhsData.isDefined && { | ||
val hasScalar = lhsData.get.shards.isEmpty || rhsData.get.shards.isEmpty | ||
val canPushdownJoin = lhsData.get.shards == rhsData.get.shards && | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the order of Also too much going on here, some comments would really be helpful to understand the intent. |
||
lhsData.get.shardKeys == rhsData.get.shardKeys && | ||
lhsData.get.tschemaLabels == rhsData.get.tschemaLabels && | ||
lhsData.get.tschemaLabels.diff(shardColumns.toSet).subsetOf(bj.on.toSet) | ||
hasScalar || canPushdownJoin | ||
} | ||
if (canPushdown) { | ||
// account for empty pushdown shards (i.e. lhs/rhs is a scalar) | ||
if (lhsData.get.shards.nonEmpty) lhsData else rhsData | ||
} else None | ||
case agg: Aggregate => | ||
// target schema labels for all leaves must be discoverable, equal, and preserved by join keys | ||
val shards = helper(agg.vectors) | ||
val canPushdown = shards != None && | ||
val pushdownData = helper(agg.vectors) | ||
val canPushdown = pushdownData.isDefined && | ||
agg.clauseOpt.isDefined && | ||
agg.clauseOpt.get.clauseType == AggregateClause.ClauseType.By && | ||
{ | ||
val targetSchemaLabels = | ||
getUniversalTargetSchemaLabels(agg, targetSchemaProvider(qContext)) | ||
targetSchemaLabels.isDefined && | ||
{ | ||
val byLabels = agg.clauseOpt.get.labels | ||
targetSchemaLabels.get.toSet.diff(shardColumns.toSet).subsetOf(byLabels.toSet) | ||
} | ||
pushdownData.get.tschemaLabels.diff(shardColumns.toSet).subsetOf(byLabels.toSet) | ||
} | ||
if (canPushdown) shards else None | ||
if (canPushdown) pushdownData else None | ||
case nl: NonLeafLogicalPlan => | ||
// Pessimistically require that this entire subtree lies on one shard (or none, if all scalars). | ||
val shardGroups = nl.children.map(helper(_)) | ||
if (shardGroups.forall(_.isDefined)) { | ||
val shards = shardGroups.flatMap(_.get).toSet | ||
// if zero elements, then all children are scalars | ||
if (shards.size <= 1) Some(shards) else None | ||
val pushdownDatas = nl.children.map(helper(_)) | ||
if (pushdownDatas.nonEmpty && pushdownDatas.forall(_.isDefined)) { | ||
val referenceData = pushdownDatas.map(_.get) | ||
.find(data => data.shards.nonEmpty) | ||
.getOrElse(pushdownDatas.head.get) | ||
val allDatasEqual = pushdownDatas.tail | ||
.forall(data => data.get.shards.isEmpty || data.get == referenceData) | ||
if (allDatasEqual) Some(referenceData) else None | ||
} else None | ||
case rs: RawSeries => | ||
val shards = qContext.plannerParams.shardOverrides.map(_.toSet) | ||
.orElse(getShardSpanFromLp(rs, qContext)) | ||
val tschemaLabels = getTargetSchemaLabelsFromRawSeries(rs, qContext) | ||
val shardKeyCols = dataset.options.nonMetricShardColumns | ||
val shardKeys = getNonMetricShardKeyFilters(rs, shardKeyCols) | ||
.flatten | ||
.map(filter => filter.column -> filter.filter.valuesStrings.head.asInstanceOf[String]) | ||
.toMap | ||
if (tschemaLabels.isDefined) { | ||
Some(PushdownData(shards.get, shardKeys, tschemaLabels.map(_.toSet).getOrElse(Set.empty))) | ||
} else None | ||
case rs: RawSeries => getShardSpanFromLp(rs, qContext) | ||
case sc: ScalarPlan => Some(Set.empty) // don't want a None to end shard-group propagation | ||
// Don't want a Scalar to end PushdownData propagation, so we'll use | ||
// Set.empty to signify propagation should continue. | ||
case sc: ScalarPlan => Some(PushdownData(Set.empty, Map.empty, Set.empty)) | ||
case _ => None | ||
} | ||
helper(lp) | ||
helper(lp).map(_.shards) | ||
} | ||
// scalastyle:on method.length | ||
// scalastyle:on cyclomatic.complexity | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
startEndMsPairOpt.map()
is cleaner than pattern match that returnNone
forNone