diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala index 05901cba8..6c545afb4 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala @@ -365,6 +365,24 @@ class SingleClusterPlanner(val dataset: Dataset, } // scalastyle:on cyclomatic.complexity + /** + * Return the target-schema labels (if any) that will be used to materialize a RawSeries. + */ + private def getTargetSchemaLabelsFromRawSeries(rs: RawSeries, qContext: QueryContext): Option[Seq[String]] = { + val tsp = targetSchemaProvider(qContext) + val filters = getColumnFilterGroup(rs).map(_.toSeq) + // Get time params from the RangeSelector, and use them to identify a TargetSchemaChanges. + val startEndMsPairOpt = rs.rangeSelector match { + case is: IntervalSelector => Some((is.from, is.to)) + case _ => None + } + val tsFunc = tsp.targetSchemaFunc(filters.head) + startEndMsPairOpt match { + case Some((startMs, endMs)) => findTargetSchema(tsFunc, startMs, endMs).map(_.schema) + case _ => None + } + } + // scalastyle:off method.length /** * Returns the shards spanned by a LogicalPlan's data. @@ -387,18 +405,11 @@ class SingleClusterPlanner(val dataset: Dataset, assert(filters.size == 1, s"expected leaf plan to yield single filter group, but got ${filters.size}") leafPlan match { case rs: RawSeries => - // Get time params from the RangeSelector, and use them to identify a TargetSchemaChanges. val startEndMsPairOpt = rs.rangeSelector match { case is: IntervalSelector => Some((is.from, is.to)) case _ => None } - val tsLabels: Option[Seq[String]] = { - val tsFunc = tsp.targetSchemaFunc(filters.head) - startEndMsPairOpt match { - case Some((startMs, endMs)) => findTargetSchema(tsFunc, startMs, endMs).map(_.schema) - case _ => None - } - } + val tsLabels: Option[Seq[String]] = getTargetSchemaLabelsFromRawSeries(rs, qContext) val (startMs, endMs): (Long, Long) = startEndMsPairOpt.getOrElse((0, Long.MaxValue)) leafInfos.append(LeafInfo(leafPlan, startMs, endMs, renameMetricFilter(filters.head), tsLabels)) // Do nothing; not pulling data from any shards. @@ -435,7 +446,10 @@ class SingleClusterPlanner(val dataset: Dataset, */ private def getPushdownShards(qContext: QueryContext, lp: LogicalPlan): Option[Set[Int]] = { - def helper(lp: LogicalPlan): Option[Set[Int]] = lp match { + case class PushdownData(shards: Set[Int], + shardKeys: Map[String, String], + tschemaLabels: Set[String]) + def helper(lp: LogicalPlan): Option[PushdownData] = lp match { // VectorPlans can't currently be pushed down. Consider: // foo{...} or vector(0) // If foo{...} is sharded with spread=1, but both shards contain no foo{...} data, @@ -461,49 +475,61 @@ class SingleClusterPlanner(val dataset: Dataset, case aif: ApplyInstantFunction => helper(aif.vectors) case bj: BinaryJoin => // lhs/rhs must reside on the same set of shards, and target schema labels for all leaves must be - // discoverable, equal, and preserved by join keys - val lhsShards = helper(bj.lhs) - val rhsShards = helper(bj.rhs) - val canPushdown = lhsShards != None && rhsShards != None && - // either the shard groups are equal, or either of lhs/rhs includes only scalars. - (lhsShards == rhsShards || lhsShards.get.isEmpty || rhsShards.get.isEmpty) && - { - val targetSchemaLabels = - getUniversalTargetSchemaLabels(bj, targetSchemaProvider(qContext)) - targetSchemaLabels.isDefined && - targetSchemaLabels.get.toSet.diff(shardColumns.toSet).subsetOf(bj.on.toSet) - } - // union lhs/rhs shards, since one might be empty (if it's a scalar) - if (canPushdown) Some(lhsShards.get.union(rhsShards.get)) else None + // discoverable, equal, and preserved by join keys. + val lhsData = helper(bj.lhs) + val rhsData = helper(bj.rhs) + val canPushdown = lhsData.isDefined && rhsData.isDefined && { + val hasScalar = lhsData.get.shards.isEmpty || rhsData.get.shards.isEmpty + val canPushdownJoin = lhsData.get.shards == rhsData.get.shards && + lhsData.get.shardKeys == rhsData.get.shardKeys && + lhsData.get.tschemaLabels == rhsData.get.tschemaLabels && + lhsData.get.tschemaLabels.diff(shardColumns.toSet).subsetOf(bj.on.toSet) + hasScalar || canPushdownJoin + } + if (canPushdown) { + // account for empty pushdown shards (i.e. lhs/rhs is a scalar) + if (lhsData.get.shards.nonEmpty) lhsData else rhsData + } else None case agg: Aggregate => // target schema labels for all leaves must be discoverable, equal, and preserved by join keys - val shards = helper(agg.vectors) - val canPushdown = shards != None && + val pushdownData = helper(agg.vectors) + val canPushdown = pushdownData.isDefined && agg.clauseOpt.isDefined && agg.clauseOpt.get.clauseType == AggregateClause.ClauseType.By && { - val targetSchemaLabels = - getUniversalTargetSchemaLabels(agg, targetSchemaProvider(qContext)) - targetSchemaLabels.isDefined && - { val byLabels = agg.clauseOpt.get.labels - targetSchemaLabels.get.toSet.diff(shardColumns.toSet).subsetOf(byLabels.toSet) - } + pushdownData.get.tschemaLabels.diff(shardColumns.toSet).subsetOf(byLabels.toSet) } - if (canPushdown) shards else None + if (canPushdown) pushdownData else None case nl: NonLeafLogicalPlan => // Pessimistically require that this entire subtree lies on one shard (or none, if all scalars). - val shardGroups = nl.children.map(helper(_)) - if (shardGroups.forall(_.isDefined)) { - val shards = shardGroups.flatMap(_.get).toSet - // if zero elements, then all children are scalars - if (shards.size <= 1) Some(shards) else None + val pushdownDatas = nl.children.map(helper(_)) + if (pushdownDatas.nonEmpty && pushdownDatas.forall(_.isDefined)) { + val referenceData = pushdownDatas.map(_.get) + .find(data => data.shards.nonEmpty) + .getOrElse(pushdownDatas.head.get) + val allDatasEqual = pushdownDatas.tail + .forall(data => data.get.shards.isEmpty || data.get == referenceData) + if (allDatasEqual) Some(referenceData) else None + } else None + case rs: RawSeries => + val shards = qContext.plannerParams.shardOverrides.map(_.toSet) + .orElse(getShardSpanFromLp(rs, qContext)) + val tschemaLabels = getTargetSchemaLabelsFromRawSeries(rs, qContext) + val shardKeyCols = dataset.options.nonMetricShardColumns + val shardKeys = getNonMetricShardKeyFilters(rs, shardKeyCols) + .flatten + .map(filter => filter.column -> filter.filter.valuesStrings.head.asInstanceOf[String]) + .toMap + if (tschemaLabels.isDefined) { + Some(PushdownData(shards.get, shardKeys, tschemaLabels.map(_.toSet).getOrElse(Set.empty))) } else None - case rs: RawSeries => getShardSpanFromLp(rs, qContext) - case sc: ScalarPlan => Some(Set.empty) // don't want a None to end shard-group propagation + // Don't want a Scalar to end PushdownData propagation, so we'll use + // Set.empty to signify propagation should continue. + case sc: ScalarPlan => Some(PushdownData(Set.empty, Map.empty, Set.empty)) case _ => None } - helper(lp) + helper(lp).map(_.shards) } // scalastyle:on method.length // scalastyle:on cyclomatic.complexity diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/SingleClusterPlannerSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/SingleClusterPlannerSpec.scala index ff676e7be..09db34677 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/SingleClusterPlannerSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/SingleClusterPlannerSpec.scala @@ -503,8 +503,9 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture binaryJoinNode.asInstanceOf[BinaryJoinExec].rhs.size shouldEqual 4 } - it ("should pushdown BinaryJoins between different shard keys when shards are identical") { + it ("should not pushdown BinaryJoins between different shard keys") { def spread(filter: Seq[ColumnFilter]): Seq[SpreadChange] = { + // Spread across all shards. Seq(SpreadChange(0, 5)) } def targetSchema(filter: Seq[ColumnFilter]): Seq[TargetSchemaChange] = { @@ -520,8 +521,7 @@ class SingleClusterPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture spreadOverride = Some(FunctionalSpreadProvider(spread)), targetSchemaProviderOverride = Some(FunctionalTargetSchemaProvider(targetSchema)), queryTimeoutMillis = 1000000))) - execPlan.isInstanceOf[LocalPartitionDistConcatExec] shouldEqual true - execPlan.children.forall(c => c.isInstanceOf[BinaryJoinExec]) + execPlan.isInstanceOf[LocalPartitionDistConcatExec] shouldEqual false } }