Skip to content

Commit

Permalink
Always report results in MVSQ when last reported value is unknown
Browse files Browse the repository at this point in the history
LocalPropertyState and LabelsState were refactored to leave the
responsibility of reporting initial results after waking up to the
caller of onNodeEvents. This worked when the waking node created new
MVSQ states in response to learning about new queries created while it
was asleep. In that situation, new MVSQ state values were constructed,
onNodeEvents would be called with an empty subscriber map for the
initialization side effects, and then the results returned by
readResults would be given to the new subscriber.

This failed for MVSQ states that existed when the node went to sleep.
Those states would be rehydrated, and they would receive the events as
arguments to onNodeEvents during the usual runPostActions invocation.
runPostActions would not call readResults on the outside, so if they
were not reported inside onNodeEvents, they would just be dropped.

The goal of the refactor was to reduce conditional logic in onNodeEvents
that detected whether it was being used just for initialization.
Instead, the result reporting was made unconditional, but when
initializing, the subscriber collection was made empty, making it a
noop. The assumption was that reporting results would be done on the
outside.

This change removes the assumption that not knowing what you reported
last means the state can leave the reporting to the caller. In the face
of uncertainty, the MVSQ states are now always responsible for reporting
results. When being initialized, this is harmless, since the subscribers
are blanked out. When being rehydrated and receiving events, this makes
the MVSQ states always produce the necessary results.

GitOrigin-RevId: b9aa2222df7688bce8a3631f1ea58e6c2047b259
  • Loading branch information
cmeade authored and thatbot-copy[bot] committed Dec 10, 2024
1 parent 76d231a commit 0566d7c
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ sealed abstract class MultipleValuesStandingQueryState extends LazySafeLogging {
* @see https://github.com/thatdot/quine-plus/pull/2280#discussion_r1115372792
* @see https://github.com/thatdot/quine-plus/pull/2522
* @param effectHandler handler for external effects
* @return whether the standing query state was updated (eg. is there anything new to save?)
* @return whether the standing query state may have been updated (eg. is there anything new to save?)
*/
def onNodeEvents(
events: Seq[NodeChangeEvent],
Expand Down Expand Up @@ -438,10 +438,6 @@ final case class AllPropertiesState(queryPartId: MultipleValuesStandingQueryPart
if (previousProperties == lastReportedProperties) {
// the result has not changed, no need to report. This case is only expected when the node is first woken up.
false
} else if (previousProperties.isEmpty) {
// Optimization: This is our first time calling onNodeEvents after wake, and we have determined that we could
// report, but we choose not to, because a report will be generated by a call to [[readResults]] later.
false
} else {
val result = QueryContext.empty + (query.aliasedAs -> propertiesAsCypher(
lastReportedProperties.get,
Expand Down Expand Up @@ -514,13 +510,16 @@ final case class LocalPropertyState(
*/
var valueAtLastReport: Option[Option[model.PropertyValue]] = None

// TODO: Clarify the conditionals that depend on valueAtLastReport and lastReportWasAMatch, potentially by collapsing
// both vars into a single var with a composite value.

/** Whether we have affirmatively matched based on [[valueAtLastReport]].
* If we haven't yet reported since registering/waking, this is false.
* If we haven't yet reported since registering/waking, this is None.
*
* Not persisted, but will be appropriately initialized by first call to [[onNodeEvents]]
* @see [[valueAtLastReport]]
*/
var lastReportWasAMatch: Boolean = false
var lastReportWasAMatch: Option[Boolean] = None

override def relevantEventTypes(labelsPropertyKey: Symbol): Seq[WatchableEventType.PropertyChange] = {
if (query.propKey == labelsPropertyKey) {
Expand Down Expand Up @@ -565,7 +564,9 @@ final case class LocalPropertyState(
val somethingChanged = query.aliasedAs match {
case Some(alias) =>
// the query cares about all changes to the property, even those that bring it from matching to still matching
if (!valueAtLastReport.contains(currentProperty) && currentPropertyDoesMatch) {
val knowSameResultReported = valueAtLastReport.contains(currentProperty)
val unknownIfChangedOrKnowChanged = !knowSameResultReported
if (unknownIfChangedOrKnowChanged && currentPropertyDoesMatch) {
val currentPropertyExpr =
currentProperty
.map(pv =>
Expand All @@ -574,21 +575,15 @@ final case class LocalPropertyState(
)
.getOrElse(Expr.Null)
val result = QueryContext.empty + (alias -> currentPropertyExpr)
lastReportWasAMatch = true

{
// Optimization to reduce duplicate results:
// If this isn't the first call to [[onNodeEvents]] since wake, we need to report the change.
// If this is, a call to [[readResults]] will do so for us (optimization)
if (valueAtLastReport.nonEmpty) effectHandler.reportUpdatedResults(result :: Nil)
}

effectHandler.reportUpdatedResults(result :: Nil)

true // we issued a new result
} else if (valueAtLastReport.contains(currentProperty)) {
} else if (knowSameResultReported) {
// the property hasn't actually changed, so we don't need to do anything
false
} else if (lastReportWasAMatch) {
// we used to match but no longer do -- cancel the previous positive result
lastReportWasAMatch = false
} else if (lastReportWasAMatch.isEmpty || lastReportWasAMatch.contains(true)) { // !currentPropertyDoesMatch
// we used to match but no longer do, or we aren't sure -- cancel any previous positive result
effectHandler.reportUpdatedResults(Nil)
true // we issued a new result
} else {
Expand All @@ -597,38 +592,33 @@ final case class LocalPropertyState(
}
case None =>
// the query only cares about changes that bring the property from not matching to matching or vice versa
if (lastReportWasAMatch != currentPropertyDoesMatch) {
if (!lastReportWasAMatch.contains(currentPropertyDoesMatch)) {
val resultGroup =
if (currentPropertyDoesMatch) {
// we do match, but we didn't use to -- so emit one empty (but positive!) result.
lastReportWasAMatch = true
QueryContext.empty :: Nil
} else {
// we don't match, but we used to -- so emit that nothing matches.
lastReportWasAMatch = false
Nil
}
{
// Optimization to reduce duplicate results:
// If this isn't the first call to [[onNodeEvents]] since wake, we need to report the change.
// If this is, a call to [[readResults]] will do so for us (optimization)
if (valueAtLastReport.nonEmpty) effectHandler.reportUpdatedResults(resultGroup)
}

effectHandler.reportUpdatedResults(resultGroup)
true
} else {
// nothing changed that we need to report - no-op.
false
}
}
valueAtLastReport = Some(currentProperty)
lastReportWasAMatch = Some(currentPropertyDoesMatch)
somethingChanged
}
.getOrElse {
// valueAtLastReport is defined for all but the first time onNodeEvents is called.
// If this is the first call to [[onNodeEvents]] since wake, the property must be None/null, so track that
if (valueAtLastReport.isEmpty) {
valueAtLastReport = Some(None)
lastReportWasAMatch = query.propConstraint.satisfiedByNone
lastReportWasAMatch = Some(query.propConstraint.satisfiedByNone)
}
// nothing changed that needs persistence
false
Expand Down Expand Up @@ -689,13 +679,13 @@ final case class LabelsState(queryPartId: MultipleValuesStandingQueryPartId) ext
var lastReportedLabels: Option[Set[Symbol]] = None

/** Whether we have affirmatively matched based on [[lastReportedLabels]].
* If we haven't yet reported since registering/waking, this is false.
* If we haven't yet reported since registering/waking, this is None.
*
* Not persisted, but will be appropriately initialized by first call to [[onNodeEvents]]
*
* @see [[lastReportedLabels]]
*/
var lastReportWasAMatch: Boolean = false
var lastReportWasAMatch: Option[Boolean] = None

override def relevantEventTypes(labelsPropertyKey: Symbol): Seq[WatchableEventType] = Seq(
WatchableEventType.PropertyChange(labelsPropertyKey),
Expand All @@ -720,30 +710,25 @@ final case class LabelsState(queryPartId: MultipleValuesStandingQueryPartId) ext
case PropertyRemoved(_, _) => None
}
val currentLabels = extractLabels(labelsValue)
lazy val matched = query.constraint(currentLabels)
val matched = query.constraint(currentLabels)

val somethingChanged: Boolean = query.aliasedAs match {
case Some(alias) =>
// the query cares about all changes to the node's labels, even those that bring it from matching to still
// matching
if (!lastReportedLabels.contains(currentLabels) && matched) {
val knowSameResultReported = lastReportedLabels.contains(currentLabels)
val unknownIfChangedOrKnowChanged = !knowSameResultReported
if (unknownIfChangedOrKnowChanged && matched) {
val labelsAsExpr = Expr.List(currentLabels.map(_.name).map(Expr.Str).toVector)
val result = QueryContext.empty + (alias -> labelsAsExpr)
lastReportWasAMatch = true

{
// Optimization to reduce duplicate results:
// If this isn't the first call to [[onNodeEvents]] since wake, we need to report the change.
// If this is, a call to [[readResults]] will do so for us (optimization)
if (lastReportedLabels.nonEmpty) effectHandler.reportUpdatedResults(result :: Nil)
}

effectHandler.reportUpdatedResults(result :: Nil)
true // we issued a new result
} else if (lastReportedLabels.contains(currentLabels)) {
} else if (knowSameResultReported) {
// the property hasn't actually changed, so we don't need to do anything
false
} else if (lastReportWasAMatch) {
} else if (lastReportWasAMatch.isEmpty || lastReportWasAMatch.contains(true)) { // !matched
// we used to match but no longer do -- cancel the previous positive result
lastReportWasAMatch = false
effectHandler.reportUpdatedResults(Nil)
true // we issued a new result
} else {
Expand All @@ -753,24 +738,17 @@ final case class LabelsState(queryPartId: MultipleValuesStandingQueryPartId) ext
case None =>
// the query only cares about the presence or absense of labels, not their values -- we only
// need to send a report when we go from matching to not matching or visa versa
if (lastReportWasAMatch != matched) {
if (!lastReportWasAMatch.contains(matched)) {
val resultGroup =
if (matched) {
// we do match, but we didn't use to -- so emit one empty (but positive!) result.
lastReportWasAMatch = true
QueryContext.empty :: Nil
} else {
// we don't match, but we used to -- so emit that nothing matches.
lastReportWasAMatch = false
Nil
}

{
// Optimization to reduce duplicate results:
// If this isn't the first call to [[onNodeEvents]] since wake, we need to report the change.
// If this is, a call to [[readResults]] will do so for us (optimization)
if (lastReportedLabels.nonEmpty) effectHandler.reportUpdatedResults(resultGroup)
}
effectHandler.reportUpdatedResults(resultGroup)
true
} else {
// nothing changed that we need to report - no-op.
Expand All @@ -779,14 +757,15 @@ final case class LabelsState(queryPartId: MultipleValuesStandingQueryPartId) ext
}

lastReportedLabels = Some(currentLabels)
lastReportWasAMatch = Some(matched)
somethingChanged
}
.getOrElse {
// lastReportedLabels is defined for all but the first time onNodeEvents is called.
// If this is the first call to [[onNodeEvents]] since wake, there must be no labels, so track that
if (lastReportedLabels.isEmpty) {
lastReportedLabels = Some(Set.empty)
lastReportWasAMatch = query.constraint(Set.empty)
lastReportWasAMatch = Some(query.constraint(Set.empty))
}
// nothing changed that needs persistence
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ class AllPropertiesStateTest extends AnyFunSuite with OptionValues {
),
),
)
assert(effects.isEmpty)
assert(effects.subscriptionsCreated.isEmpty)
assert(effects.subscriptionsCancelled.isEmpty)
assert(effects.resultsReported.nonEmpty)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ class LabelsStateTests extends AnyFunSuite with OptionValues {
) { (effects, initialResultOpt) =>
val initialResultFromThreeLabels = initialResultOpt.value
assert(initialResultFromThreeLabels == Seq(QueryContext.empty))
assert(effects.isEmpty)
assert(effects.subscriptionsCreated.isEmpty)
assert(effects.subscriptionsCancelled.isEmpty)
assert(effects.resultsReported.nonEmpty)
}
}

Expand All @@ -68,7 +70,9 @@ class LabelsStateTests extends AnyFunSuite with OptionValues {
) { (effects, initialResultOpt) =>
val initialResultFromEmptyLabels = initialResultOpt.value
assert(initialResultFromEmptyLabels == Seq(QueryContext.empty))
assert(effects.isEmpty)
assert(effects.subscriptionsCreated.isEmpty)
assert(effects.subscriptionsCancelled.isEmpty)
assert(effects.resultsReported.nonEmpty)
}
}

Expand Down Expand Up @@ -162,7 +166,9 @@ class LabelsStateTests extends AnyFunSuite with OptionValues {
) { (effects, initialResultOpt) =>
val initialResultFromThreeLabels = initialResultOpt.value
assert(initialResultFromThreeLabels == Seq(makeLabelsRow(alias, Set(Symbol("A"), Symbol("B"), Symbol("C")))))
assert(effects.isEmpty)
assert(effects.subscriptionsCreated.isEmpty)
assert(effects.subscriptionsCancelled.isEmpty)
assert(effects.resultsReported.nonEmpty)
}
}

Expand All @@ -177,7 +183,9 @@ class LabelsStateTests extends AnyFunSuite with OptionValues {
) { (effects, initialResultOpt) =>
val initialResultFromEmptyLabels = initialResultOpt.value
assert(initialResultFromEmptyLabels == Seq(makeLabelsRow(alias, Set.empty)))
assert(effects.isEmpty)
assert(effects.subscriptionsCreated.isEmpty)
assert(effects.subscriptionsCancelled.isEmpty)
assert(effects.resultsReported.nonEmpty)
}
}

Expand Down Expand Up @@ -280,7 +288,9 @@ class LabelsStateTests extends AnyFunSuite with OptionValues {
) { (effects, initialResultOpt) =>
val initialResultFromThreeLabels = initialResultOpt.value
assert(initialResultFromThreeLabels == Seq(QueryContext.empty))
assert(effects.isEmpty)
assert(effects.subscriptionsCreated.isEmpty)
assert(effects.subscriptionsCancelled.isEmpty)
assert(effects.resultsReported.nonEmpty)
}
}

Expand All @@ -295,7 +305,9 @@ class LabelsStateTests extends AnyFunSuite with OptionValues {
) { (effects, initialResultOpt) =>
val initialResultFromEmptyLabels = initialResultOpt.value
assert(initialResultFromEmptyLabels == Seq.empty)
assert(effects.isEmpty)
assert(effects.subscriptionsCreated.isEmpty)
assert(effects.subscriptionsCancelled.isEmpty)
assert(effects.resultsReported.nonEmpty)
}
}

Expand Down Expand Up @@ -397,7 +409,9 @@ class LabelsStateTests extends AnyFunSuite with OptionValues {
) { (effects, initialResultOpt) =>
val initialResultFromThreeLabels = initialResultOpt.value
assert(initialResultFromThreeLabels == Seq(makeLabelsRow(alias, Set(Symbol("A"), Symbol("B"), Symbol("C")))))
assert(effects.isEmpty)
assert(effects.subscriptionsCreated.isEmpty)
assert(effects.subscriptionsCancelled.isEmpty)
assert(effects.resultsReported.nonEmpty)
}
}

Expand All @@ -412,7 +426,9 @@ class LabelsStateTests extends AnyFunSuite with OptionValues {
) { (effects, initialResultOpt) =>
val initialResultFromEmptyLabels = initialResultOpt.value
assert(initialResultFromEmptyLabels == Seq.empty)
assert(effects.isEmpty)
assert(effects.subscriptionsCreated.isEmpty)
assert(effects.subscriptionsCancelled.isEmpty)
assert(effects.resultsReported.nonEmpty)
}
}

Expand Down Expand Up @@ -526,7 +542,9 @@ class LabelsStateTests extends AnyFunSuite with OptionValues {
) { (effects, initialResultOpt) =>
val initialResultFromThreeLabels = initialResultOpt.value
assert(initialResultFromThreeLabels == Seq(makeLabelsRow(alias, Set(Symbol("A"), Symbol("B"), Symbol("C")))))
assert(effects.isEmpty)
assert(effects.subscriptionsCreated.isEmpty)
assert(effects.subscriptionsCancelled.isEmpty)
assert(effects.resultsReported.nonEmpty)
}
}

Expand All @@ -541,7 +559,9 @@ class LabelsStateTests extends AnyFunSuite with OptionValues {
) { (effects, initialResultOpt) =>
val initialResultFromEmptyLabels = initialResultOpt.value
assert(initialResultFromEmptyLabels == Seq.empty)
assert(effects.isEmpty)
assert(effects.subscriptionsCreated.isEmpty)
assert(effects.subscriptionsCancelled.isEmpty)
assert(effects.resultsReported.nonEmpty)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,9 @@ class LocalPropertyStateTests extends AnyFunSuite with OptionValues {
tempState.initialize(Map(query.propKey -> PropertyValue(QuineValue.Integer(2L)))) { (effects, initialResultOpt) =>
val initialResultFromNull = initialResultOpt.value
assert(initialResultFromNull == Seq())
assert(effects.isEmpty)
assert(effects.subscriptionsCreated.isEmpty)
assert(effects.subscriptionsCancelled.isEmpty)
assert(effects.resultsReported.nonEmpty)
}
}

Expand All @@ -320,7 +322,9 @@ class LocalPropertyStateTests extends AnyFunSuite with OptionValues {
tempState.initialize(Map(query.propKey -> PropertyValue(QuineValue.Integer(1L)))) { (effects, initialResultOpt) =>
val initialResultFromMatch = initialResultOpt.value
assert(initialResultFromMatch == Seq(QueryContext(Map(query.aliasedAs.get -> Expr.Integer(1L)))))
assert(effects.isEmpty)
assert(effects.subscriptionsCreated.isEmpty)
assert(effects.subscriptionsCancelled.isEmpty)
assert(effects.resultsReported.nonEmpty)
}
}

Expand Down

0 comments on commit 0566d7c

Please sign in to comment.