Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(query) extend @modifier to longtime and multi-partition queries. #1837

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,23 +133,32 @@ trait DefaultPlanner {
val execRangeFn = if (logicalPlanWithoutBucket.function == RangeFunctionId.AbsentOverTime) Last
else InternalRangeFunction.lpToInternalFunc(logicalPlanWithoutBucket.function)

val realScanStartMs = lp.atMs.getOrElse(logicalPlanWithoutBucket.startMs)
val realScanEndMs = lp.atMs.getOrElse(logicalPlanWithoutBucket.endMs)
val realScanStepMs: Long = lp.atMs.map(_ => 0L).getOrElse(lp.stepMs)

val paramsExec = materializeFunctionArgs(logicalPlanWithoutBucket.functionArgs, qContext)
val window = if (execRangeFn == InternalRangeFunction.Timestamp) None else Some(logicalPlanWithoutBucket.window)
series.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(logicalPlanWithoutBucket.startMs,
logicalPlanWithoutBucket.stepMs, logicalPlanWithoutBucket.endMs, window, Some(execRangeFn), qContext,
series.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(realScanStartMs,
realScanStepMs, realScanEndMs, window, Some(execRangeFn), qContext,
logicalPlanWithoutBucket.stepMultipleNotationUsed,
paramsExec, logicalPlanWithoutBucket.offsetMs, rawSource = rawSource)))
if (logicalPlanWithoutBucket.function == RangeFunctionId.AbsentOverTime) {
val result = if (logicalPlanWithoutBucket.function == RangeFunctionId.AbsentOverTime) {
val aggregate = Aggregate(AggregationOperator.Sum, logicalPlanWithoutBucket, Nil,
AggregateClause.byOpt(Seq("job")))
// Add sum to aggregate all child responses
// If all children have NaN value, sum will yield NaN and AbsentFunctionMapper will yield 1
val aggregatePlanResult = PlanResult(Seq(addAggregator(aggregate, qContext.copy(plannerParams =
qContext.plannerParams.copy(skipAggregatePresent = true)), series)))
addAbsentFunctionMapper(aggregatePlanResult, logicalPlanWithoutBucket.columnFilters,
RangeParams(logicalPlanWithoutBucket.startMs / 1000, logicalPlanWithoutBucket.stepMs / 1000,
logicalPlanWithoutBucket.endMs / 1000), qContext)
RangeParams(realScanStartMs, realScanStepMs, realScanEndMs), qContext)
} else series
// repeat the same value for each step if '@' is specified
if (lp.atMs.nonEmpty) {
result.plans.foreach(p => p.addRangeVectorTransformer(RepeatTransformer(lp.startMs, lp.stepMs, lp.endMs,
p.queryWithPlanName(qContext))))
}
result
}

private[queryplanner] def removeBucket(lp: Either[PeriodicSeries, PeriodicSeriesWithWindowing]):
Expand Down Expand Up @@ -202,25 +211,53 @@ trait DefaultPlanner {

} else (None, None, lp)

val realScanStartMs = lp.atMs.getOrElse(lp.startMs)
val realScanEndMs = lp.atMs.getOrElse(lp.endMs)
val realScanStepMs: Long = lp.atMs.map(_ => 0L).getOrElse(lp.stepMs)

val rawSeries = walkLogicalPlanTree(lpWithoutBucket.rawSeries, qContext, forceInProcess)
val rawSource = lpWithoutBucket.rawSeries.isRaw && (lpWithoutBucket.rawSeries match {
case r: RawSeries => !r.supportsRemoteDataCall
case _ => true
})
rawSeries.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(lp.startMs, lp.stepMs, lp.endMs,
window = None, functionId = None, qContext, stepMultipleNotationUsed = false, funcParams = Nil,
rawSeries.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(realScanStartMs, realScanStepMs,
realScanEndMs, window = None, functionId = None, qContext, stepMultipleNotationUsed = false, funcParams = Nil,
lp.offsetMs, rawSource = rawSource)))

if (nameFilter.isDefined && nameFilter.head.endsWith("_bucket") && leFilter.isDefined) {
val paramsExec = StaticFuncArgs(leFilter.head.toDouble, RangeParams(lp.startMs / 1000, lp.stepMs / 1000,
lp.endMs / 1000))
val paramsExec = StaticFuncArgs(leFilter.head.toDouble, RangeParams(realScanStartMs / 1000, realScanStepMs / 1000,
realScanEndMs / 1000))
rawSeries.plans.foreach(_.addRangeVectorTransformer(InstantVectorFunctionMapper(HistogramBucket,
Seq(paramsExec))))
}
// repeat the same value for each step if '@' is specified
if (lp.atMs.nonEmpty) {
rawSeries.plans.foreach(p => p.addRangeVectorTransformer(RepeatTransformer(lp.startMs, lp.stepMs, lp.endMs,
p.queryWithPlanName(qContext))))
}
rawSeries
}

def materializeApplyInstantFunction(qContext: QueryContext,
protected def getAtModifierTimestampsWithOffset(periodicSeriesPlan: PeriodicSeriesPlan): Seq[Long] = {
periodicSeriesPlan match {
case ps: PeriodicSeries => ps.atMs.map(at => at - ps.offsetMs.getOrElse(0L)).toSeq
case sww: SubqueryWithWindowing => sww.atMs.map(at => at - sww.offsetMs.getOrElse(0L)).toSeq
case psw: PeriodicSeriesWithWindowing => psw.atMs.map(at => at - psw.offsetMs.getOrElse(0L)).toSeq
case ts: TopLevelSubquery => ts.atMs.map(at => at - ts.originalOffsetMs.getOrElse(0L)).toSeq
case bj: BinaryJoin => getAtModifierTimestampsWithOffset(bj.lhs) ++ getAtModifierTimestampsWithOffset(bj.rhs)
case agg: Aggregate => getAtModifierTimestampsWithOffset(agg.vectors)
case aif: ApplyInstantFunction => getAtModifierTimestampsWithOffset(aif.vectors)
case amf: ApplyMiscellaneousFunction => getAtModifierTimestampsWithOffset(amf.vectors)
case asf: ApplySortFunction => getAtModifierTimestampsWithOffset(asf.vectors)
case aaf: ApplyAbsentFunction => getAtModifierTimestampsWithOffset(aaf.vectors)
case alf: ApplyLimitFunction => getAtModifierTimestampsWithOffset(alf.vectors)
case _: RawChunkMeta | _: ScalarBinaryOperation | _: ScalarFixedDoublePlan | _: ScalarTimeBasedPlan |
_: ScalarVaryingDoublePlan | _: ScalarVectorBinaryOperation | _: VectorPlan => Seq()
}
}


def materializeApplyInstantFunction(qContext: QueryContext,
lp: ApplyInstantFunction,
forceInProcess: Boolean = false): PlanResult = {
val vectors = walkLogicalPlanTree(lp.vectors, qContext, forceInProcess)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ object LogicalPlanUtils extends StrictLogging {
// is an interval that we want to process on one machine (primarily to compute range functions).
// SubqueryWithWindowing has such a lookback while TopLevelSubquery does not.
logicalPlan match {
case psw: PeriodicSeriesWithWindowing => Seq(psw.window)
case sww: SubqueryWithWindowing => getLookBackMillis(sww.innerPeriodicSeries).map(lb => lb + sww.subqueryWindowMs)
case nl: NonLeafLogicalPlan => nl.children.flatMap(getLookBackMillis)
case rs: RawSeries => Seq(rs.lookbackMs.getOrElse(WindowConstants.staleDataLookbackMillis))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,23 @@ import filodb.query.exec._
PlanResult(Seq(execPlan))
}

private def getAtModifierTimestampsWithOffset(periodicSeriesPlan: PeriodicSeriesPlan): Seq[Long] = {
periodicSeriesPlan match {
case ps: PeriodicSeries => ps.atMs.map(at => at - ps.offsetMs.getOrElse(0L)).toSeq
case sww: SubqueryWithWindowing => sww.atMs.map(at => at - sww.offsetMs.getOrElse(0L)).toSeq
case psw: PeriodicSeriesWithWindowing => psw.atMs.map(at => at - psw.offsetMs.getOrElse(0L)).toSeq
case ts: TopLevelSubquery => ts.atMs.map(at => at - ts.originalOffsetMs.getOrElse(0L)).toSeq
case bj: BinaryJoin => getAtModifierTimestampsWithOffset(bj.lhs) ++ getAtModifierTimestampsWithOffset(bj.rhs)
case agg: Aggregate => getAtModifierTimestampsWithOffset(agg.vectors)
case aif: ApplyInstantFunction => getAtModifierTimestampsWithOffset(aif.vectors)
case amf: ApplyMiscellaneousFunction => getAtModifierTimestampsWithOffset(amf.vectors)
case asf: ApplySortFunction => getAtModifierTimestampsWithOffset(asf.vectors)
case aaf: ApplyAbsentFunction => getAtModifierTimestampsWithOffset(aaf.vectors)
case alf: ApplyLimitFunction => getAtModifierTimestampsWithOffset(alf.vectors)
case _: RawChunkMeta | _: ScalarBinaryOperation | _: ScalarFixedDoublePlan | _: ScalarTimeBasedPlan|
_: ScalarVaryingDoublePlan | _: ScalarVectorBinaryOperation | _: VectorPlan => Seq()
private def isPlanLongTimeAtModifier(plan: LogicalPlan): Boolean = {
plan match {
case periodicSeriesPlan: PeriodicSeriesPlan if periodicSeriesPlan.hasAtModifier =>
val earliestRawTime = earliestRawTimestampFn
val offsetMillis = LogicalPlanUtils.getOffsetMillis(periodicSeriesPlan)
val (maxOffset, minOffset) = (offsetMillis.max, offsetMillis.min)
val lookbackMs = LogicalPlanUtils.getLookBackMillis(periodicSeriesPlan).max
val latestDownsampleTimestamp = latestDownsampleTimestampFn
val atMsList = getAtModifierTimestampsWithOffset(periodicSeriesPlan)
val realStartMs = Math.min(periodicSeriesPlan.startMs - maxOffset, atMsList.min - lookbackMs)
val realEndMs = Math.max(periodicSeriesPlan.endMs - minOffset, atMsList.max)

realStartMs <= latestDownsampleTimestamp && realEndMs >= earliestRawTime
case _ => false
}
}


// scalastyle:off method.length
private def materializeRoutablePlan(qContext: QueryContext, periodicSeriesPlan: PeriodicSeriesPlan): ExecPlan = {
import LogicalPlan._
Expand All @@ -74,23 +72,25 @@ import filodb.query.exec._
val (maxOffset, minOffset) = (offsetMillis.max, offsetMillis.min)

val lookbackMs = LogicalPlanUtils.getLookBackMillis(periodicSeriesPlan).max

val startWithOffsetMs = periodicSeriesPlan.startMs - maxOffset
// For scalar binary operation queries like sum(rate(foo{job = "app"}[5m] offset 8d)) * 0.5
val endWithOffsetMs = periodicSeriesPlan.endMs - minOffset
val atModifierTimestampsWithOffset = getAtModifierTimestampsWithOffset(periodicSeriesPlan)
val startWithOffsetMs = if (periodicSeriesPlan.hasAtModifier) atModifierTimestampsWithOffset.min
else periodicSeriesPlan.startMs - maxOffset
// For scalar binary operation queries like sum(rate(foo{job = "app"}[5m] offset 8d)) * 0.5
val endWithOffsetMs =
if (periodicSeriesPlan.hasAtModifier) atModifierTimestampsWithOffset.max else periodicSeriesPlan.endMs - minOffset
// val atModifierTimestampsWithOffset = getAtModifierTimestampsWithOffset(periodicSeriesPlan)

val isAtModifierValid = if (startWithOffsetMs - lookbackMs >= earliestRawTime) {
// should be in raw cluster.
atModifierTimestampsWithOffset.forall(at => at - lookbackMs >= earliestRawTime)
} else if (endWithOffsetMs - lookbackMs < earliestRawTime) {
// should be in down sample cluster.
atModifierTimestampsWithOffset.forall(at => at - lookbackMs < earliestRawTime)
} else {
atModifierTimestampsWithOffset.isEmpty
}
require(isAtModifierValid, s"@modifier $atModifierTimestampsWithOffset is not supported. Because it queries" +
s"both down sampled and raw data. Please adjust the query parameters if you want to use @modifier.")
// val isAtModifierValid = if (startWithOffsetMs - lookbackMs >= earliestRawTime) {
// // should be in raw cluster.
// atModifierTimestampsWithOffset.forall(at => at - lookbackMs >= earliestRawTime)
// } else if (endWithOffsetMs - lookbackMs < earliestRawTime) {
// // should be in down sample cluster.
// atModifierTimestampsWithOffset.forall(at => at - lookbackMs < earliestRawTime)
// } else {
// atModifierTimestampsWithOffset.isEmpty
// }
// require(isAtModifierValid, s"@modifier $atModifierTimestampsWithOffset is not supported. Because it queries" +
// s"both down sampled and raw data. Please adjust the query parameters if you want to use @modifier.")

if (maxOffset != minOffset
&& startWithOffsetMs - lookbackMs < earliestRawTime
Expand Down Expand Up @@ -119,7 +119,8 @@ import filodb.query.exec._
// should check for ANY interval overalapping earliestRawTime. We
// can happen with ANY lookback interval, not just the last one.
} else if (
endWithOffsetMs - lookbackMs < earliestRawTime || //TODO lookbacks can overlap in the middle intervals too
(endWithOffsetMs - lookbackMs < earliestRawTime && !periodicSeriesPlan.hasAtModifier)
|| //TODO lookbacks can overlap in the middle intervals too
LogicalPlan.hasSubqueryWithWindowing(periodicSeriesPlan)
) {
// For subqueries and long lookback queries, we keep things simple by routing to
Expand Down Expand Up @@ -245,8 +246,8 @@ import filodb.query.exec._
override def walkLogicalPlanTree(logicalPlan: LogicalPlan,
qContext: QueryContext,
forceInProcess: Boolean = false): PlanResult = {

if (!LogicalPlanUtils.hasBinaryJoin(logicalPlan)) {
val isLongTimeAtModifier = isPlanLongTimeAtModifier(logicalPlan)
if (!LogicalPlanUtils.hasBinaryJoin(logicalPlan) && !isLongTimeAtModifier) {
logicalPlan match {
case p: PeriodicSeriesPlan => materializePeriodicSeriesPlan(qContext, p)
case lc: LabelCardinality => materializeLabelCardinalityPlan(lc, qContext)
Expand All @@ -266,8 +267,10 @@ import filodb.query.exec._
case lp: PeriodicSeriesWithWindowing => materializePeriodicSeriesPlan(qContext, lp)
case lp: ApplyInstantFunction => super.materializeApplyInstantFunction(qContext, lp)
case lp: ApplyInstantFunctionRaw => super.materializeApplyInstantFunctionRaw(qContext, lp)
case lp: Aggregate => materializePeriodicSeriesPlan(qContext, lp)
case lp: BinaryJoin => materializePeriodicSeriesPlan(qContext, lp)
case lp: Aggregate if isLongTimeAtModifier => super.materializeAggregate(qContext, lp, forceInProcess = true)
case lp: Aggregate if !isLongTimeAtModifier => materializePeriodicSeriesPlan(qContext, lp)
case lp: BinaryJoin if isLongTimeAtModifier => super.materializeBinaryJoin(qContext, lp, forceInProcess = true)
case lp: BinaryJoin if !isLongTimeAtModifier => materializePeriodicSeriesPlan(qContext, lp)
case lp: ScalarVectorBinaryOperation => super.materializeScalarVectorBinOp(qContext, lp)
case lp: LabelValues => rawClusterMaterialize(qContext, lp)
case lp: TsCardinalities => materializeTSCardinalityPlan(qContext, lp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
}
)
)
case _: RawSeries => materializePeriodicAndRawSeries(logicalPlan, qContext)
case _ : LogicalPlan => super.defaultWalkLogicalPlanTree(logicalPlan, qContext, forceInProcess)
}
} else {
Expand Down Expand Up @@ -352,7 +353,16 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
val stepMs = queryParams.stepSecs * 1000
val isInstantQuery: Boolean = if (queryParams.startSecs == queryParams.endSecs) true else false
var prevPartitionStart = queryParams.startSecs * 1000
val execPlans = partitions.zipWithIndex.map { case (p, i) =>
val execPlans = partitions
.filter(p => {
if (!logicalPlan.hasAtModifier || !logicalPlan.isInstanceOf[PeriodicSeriesPlan])
true
else {
val time = getAtModifierTimestampsWithOffset(logicalPlan.asInstanceOf[PeriodicSeriesPlan]).head
p.timeRange.startMs <= time && p.timeRange.endMs > time
}
})
.zipWithIndex.map { case (p, i) =>
// First partition should start from query start time, no need to calculate time according to step for instant
// queries
val startMs =
Expand Down Expand Up @@ -400,12 +410,14 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
timeRangeOverride: Option[TimeRange] = None): ExecPlan = {
val qContextWithOverride = timeRangeOverride.map{ r =>
val oldParams = queryContext.origQueryParams.asInstanceOf[PromQlQueryParams]
val newParams = oldParams.copy(startSecs = r.startMs / 1000, endSecs = r.endMs / 1000)
val newParams = oldParams.copy(startSecs = r.startMs / 1000, endSecs = r.endMs / 1000,
promQl = if (logicalPlan.hasAtModifier) LogicalPlanParser.convertToQuery(logicalPlan) else oldParams.promQl)
queryContext.copy(origQueryParams = newParams)
}.getOrElse(queryContext)
val queryParams = qContextWithOverride.origQueryParams.asInstanceOf[PromQlQueryParams]
val timeRange = timeRangeOverride.getOrElse(TimeRange(1000 * queryParams.startSecs, 1000 * queryParams.endSecs))
val (partitionName, grpcEndpoint) = (partition.partitionName, partition.grpcEndPoint)

if (partitionName.equals(localPartitionName)) {
// FIXME: the below check is needed because subquery tests fail when their
// time-ranges are updated even with the original query params.
Expand Down Expand Up @@ -549,22 +561,29 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
.filter(_.isInstanceOf[RawSeries])
.map(getPartitions(_, qParams))
.exists(_.size > 1)
if (hasMultiPartitionLeaves) {
if (hasMultiPartitionLeaves && !logicalPlan.hasAtModifier) {
materializeSplitLeafPlan(logicalPlan, qContext)
} else { logicalPlan match {
case agg: Aggregate => materializeAggregate(agg, qContext)
case psw: PeriodicSeriesWithWindowing => materializePeriodicAndRawSeries(psw, qContext)
case sqw: SubqueryWithWindowing => super.materializeSubqueryWithWindowing(qContext, sqw)
case bj: BinaryJoin => materializeMultiPartitionBinaryJoinNoSplitLeaf(bj, qContext)
case sv: ScalarVectorBinaryOperation => super.materializeScalarVectorBinOp(qContext, sv)
case aif: ApplyInstantFunction => super.materializeApplyInstantFunction(qContext, aif)
case svdp: ScalarVaryingDoublePlan => super.materializeScalarPlan(qContext, svdp)
case aaf: ApplyAbsentFunction => super.materializeAbsentFunction(qContext, aaf)
case ps: PeriodicSeries => materializePeriodicAndRawSeries(ps, qContext)
case rcm: RawChunkMeta => materializePeriodicAndRawSeries(rcm, qContext)
case rs: RawSeries => materializePeriodicAndRawSeries(rs, qContext)
case x => throw new IllegalArgumentException(s"unhandled type: ${x.getClass}")
}}
} else {
// either have @modifier or only one partition.
val canSplit = hasMultiPartitionLeaves && logicalPlan.hasAtModifier
logicalPlan match {
case agg: Aggregate => materializeAggregate(agg, qContext)
case psw: PeriodicSeriesWithWindowing if canSplit =>
super.materializePeriodicSeriesWithWindowing(qContext, psw, forceInProcess = true)
case psw: PeriodicSeriesWithWindowing if !canSplit => materializePeriodicAndRawSeries(psw, qContext)
case sqw: SubqueryWithWindowing => super.materializeSubqueryWithWindowing(qContext, sqw)
case bj: BinaryJoin => materializeMultiPartitionBinaryJoinNoSplitLeaf(bj, qContext)
case sv: ScalarVectorBinaryOperation => super.materializeScalarVectorBinOp(qContext, sv)
case aif: ApplyInstantFunction => super.materializeApplyInstantFunction(qContext, aif)
case svdp: ScalarVaryingDoublePlan => super.materializeScalarPlan(qContext, svdp)
case aaf: ApplyAbsentFunction => super.materializeAbsentFunction(qContext, aaf)
case ps: PeriodicSeries if canSplit => super.materializePeriodicSeries(qContext, ps, forceInProcess = true)
case ps: PeriodicSeries if !canSplit => materializePeriodicAndRawSeries(ps, qContext)
case rcm: RawChunkMeta => materializePeriodicAndRawSeries(rcm, qContext)
case rs: RawSeries => materializePeriodicAndRawSeries(rs, qContext)
case x => throw new IllegalArgumentException(s"unhandled type: ${x.getClass}")
}
}
}

/**
Expand Down
Loading
Loading