From 0566d7c8bf7d43f91ea7a2b2a09f2a6525d3a7b4 Mon Sep 17 00:00:00 2001 From: Christopher Meade <2099307+cmeade@users.noreply.github.com> Date: Mon, 9 Dec 2024 17:44:37 -0800 Subject: [PATCH] Always report results in MVSQ when last reported value is unknown LocalPropertyState and LabelsState were refactored to leave the responsibility of reporting initial results after waking up to the caller of onNodeEvents. This worked when the waking node created new MVSQ states in response to learning about new queries created while it was asleep. In that situation, new MVSQ state values were constructed, onNodeEvents would be called with an empty subscriber map for the initialization side effects, and then the results returned by readResults would be given to the new subscriber. This failed for MVSQ states that existed when the node went to sleep. Those states would be rehydrated, and they would receive the events as arguments to onNodeEvents during the usual runPostActions invocation. runPostActions would not call readResults on the outside, so if they were not reported inside onNodeEvents, they would just be dropped. The goal of the refactor was to reduce conditional logic in onNodeEvents that detected whether it was being used just for initialization. Instead, the result reporting was made unconditional, but when initializing, the subscriber collection was made empty, making it a noop. The assumption was that reporting results would be done on the outside. This change removes the assumption that not knowing what you reported last means the state can leave the reporting to the caller. In the face of uncertainty, the MVSQ states are now always responsible for reporting results. When being initialized, this is harmless, since the subscribers are blanked out. When being rehydrated and receiving events, this makes the MVSQ states always produce the necessary results. GitOrigin-RevId: b9aa2222df7688bce8a3631f1ea58e6c2047b259 --- .../MultipleValuesStandingQueryState.scala | 89 +++++++------------ .../graph/standing/AllPropertiesState.scala | 4 +- .../graph/standing/LabelsStateTests.scala | 40 ++++++--- .../standing/LocalPropertyStateTests.scala | 8 +- 4 files changed, 73 insertions(+), 68 deletions(-) diff --git a/quine-core/src/main/scala/com/thatdot/quine/graph/cypher/MultipleValuesStandingQueryState.scala b/quine-core/src/main/scala/com/thatdot/quine/graph/cypher/MultipleValuesStandingQueryState.scala index e9957edc..6f716841 100644 --- a/quine-core/src/main/scala/com/thatdot/quine/graph/cypher/MultipleValuesStandingQueryState.scala +++ b/quine-core/src/main/scala/com/thatdot/quine/graph/cypher/MultipleValuesStandingQueryState.scala @@ -105,7 +105,7 @@ sealed abstract class MultipleValuesStandingQueryState extends LazySafeLogging { * @see https://github.com/thatdot/quine-plus/pull/2280#discussion_r1115372792 * @see https://github.com/thatdot/quine-plus/pull/2522 * @param effectHandler handler for external effects - * @return whether the standing query state was updated (eg. is there anything new to save?) + * @return whether the standing query state may have been updated (eg. is there anything new to save?) */ def onNodeEvents( events: Seq[NodeChangeEvent], @@ -438,10 +438,6 @@ final case class AllPropertiesState(queryPartId: MultipleValuesStandingQueryPart if (previousProperties == lastReportedProperties) { // the result has not changed, no need to report. This case is only expected when the node is first woken up. false - } else if (previousProperties.isEmpty) { - // Optimization: This is our first time calling onNodeEvents after wake, and we have determined that we could - // report, but we choose not to, because a report will be generated by a call to [[readResults]] later. - false } else { val result = QueryContext.empty + (query.aliasedAs -> propertiesAsCypher( lastReportedProperties.get, @@ -514,13 +510,16 @@ final case class LocalPropertyState( */ var valueAtLastReport: Option[Option[model.PropertyValue]] = None + // TODO: Clarify the conditionals that depend on valueAtLastReport and lastReportWasAMatch, potentially by collapsing + // both vars into a single var with a composite value. + /** Whether we have affirmatively matched based on [[valueAtLastReport]]. - * If we haven't yet reported since registering/waking, this is false. + * If we haven't yet reported since registering/waking, this is None. * * Not persisted, but will be appropriately initialized by first call to [[onNodeEvents]] * @see [[valueAtLastReport]] */ - var lastReportWasAMatch: Boolean = false + var lastReportWasAMatch: Option[Boolean] = None override def relevantEventTypes(labelsPropertyKey: Symbol): Seq[WatchableEventType.PropertyChange] = { if (query.propKey == labelsPropertyKey) { @@ -565,7 +564,9 @@ final case class LocalPropertyState( val somethingChanged = query.aliasedAs match { case Some(alias) => // the query cares about all changes to the property, even those that bring it from matching to still matching - if (!valueAtLastReport.contains(currentProperty) && currentPropertyDoesMatch) { + val knowSameResultReported = valueAtLastReport.contains(currentProperty) + val unknownIfChangedOrKnowChanged = !knowSameResultReported + if (unknownIfChangedOrKnowChanged && currentPropertyDoesMatch) { val currentPropertyExpr = currentProperty .map(pv => @@ -574,21 +575,15 @@ final case class LocalPropertyState( ) .getOrElse(Expr.Null) val result = QueryContext.empty + (alias -> currentPropertyExpr) - lastReportWasAMatch = true - - { - // Optimization to reduce duplicate results: - // If this isn't the first call to [[onNodeEvents]] since wake, we need to report the change. - // If this is, a call to [[readResults]] will do so for us (optimization) - if (valueAtLastReport.nonEmpty) effectHandler.reportUpdatedResults(result :: Nil) - } + + effectHandler.reportUpdatedResults(result :: Nil) + true // we issued a new result - } else if (valueAtLastReport.contains(currentProperty)) { + } else if (knowSameResultReported) { // the property hasn't actually changed, so we don't need to do anything false - } else if (lastReportWasAMatch) { - // we used to match but no longer do -- cancel the previous positive result - lastReportWasAMatch = false + } else if (lastReportWasAMatch.isEmpty || lastReportWasAMatch.contains(true)) { // !currentPropertyDoesMatch + // we used to match but no longer do, or we aren't sure -- cancel any previous positive result effectHandler.reportUpdatedResults(Nil) true // we issued a new result } else { @@ -597,23 +592,17 @@ final case class LocalPropertyState( } case None => // the query only cares about changes that bring the property from not matching to matching or vice versa - if (lastReportWasAMatch != currentPropertyDoesMatch) { + if (!lastReportWasAMatch.contains(currentPropertyDoesMatch)) { val resultGroup = if (currentPropertyDoesMatch) { // we do match, but we didn't use to -- so emit one empty (but positive!) result. - lastReportWasAMatch = true QueryContext.empty :: Nil } else { // we don't match, but we used to -- so emit that nothing matches. - lastReportWasAMatch = false Nil } - { - // Optimization to reduce duplicate results: - // If this isn't the first call to [[onNodeEvents]] since wake, we need to report the change. - // If this is, a call to [[readResults]] will do so for us (optimization) - if (valueAtLastReport.nonEmpty) effectHandler.reportUpdatedResults(resultGroup) - } + + effectHandler.reportUpdatedResults(resultGroup) true } else { // nothing changed that we need to report - no-op. @@ -621,6 +610,7 @@ final case class LocalPropertyState( } } valueAtLastReport = Some(currentProperty) + lastReportWasAMatch = Some(currentPropertyDoesMatch) somethingChanged } .getOrElse { @@ -628,7 +618,7 @@ final case class LocalPropertyState( // If this is the first call to [[onNodeEvents]] since wake, the property must be None/null, so track that if (valueAtLastReport.isEmpty) { valueAtLastReport = Some(None) - lastReportWasAMatch = query.propConstraint.satisfiedByNone + lastReportWasAMatch = Some(query.propConstraint.satisfiedByNone) } // nothing changed that needs persistence false @@ -689,13 +679,13 @@ final case class LabelsState(queryPartId: MultipleValuesStandingQueryPartId) ext var lastReportedLabels: Option[Set[Symbol]] = None /** Whether we have affirmatively matched based on [[lastReportedLabels]]. - * If we haven't yet reported since registering/waking, this is false. + * If we haven't yet reported since registering/waking, this is None. * * Not persisted, but will be appropriately initialized by first call to [[onNodeEvents]] * * @see [[lastReportedLabels]] */ - var lastReportWasAMatch: Boolean = false + var lastReportWasAMatch: Option[Boolean] = None override def relevantEventTypes(labelsPropertyKey: Symbol): Seq[WatchableEventType] = Seq( WatchableEventType.PropertyChange(labelsPropertyKey), @@ -720,30 +710,25 @@ final case class LabelsState(queryPartId: MultipleValuesStandingQueryPartId) ext case PropertyRemoved(_, _) => None } val currentLabels = extractLabels(labelsValue) - lazy val matched = query.constraint(currentLabels) + val matched = query.constraint(currentLabels) val somethingChanged: Boolean = query.aliasedAs match { case Some(alias) => // the query cares about all changes to the node's labels, even those that bring it from matching to still // matching - if (!lastReportedLabels.contains(currentLabels) && matched) { + val knowSameResultReported = lastReportedLabels.contains(currentLabels) + val unknownIfChangedOrKnowChanged = !knowSameResultReported + if (unknownIfChangedOrKnowChanged && matched) { val labelsAsExpr = Expr.List(currentLabels.map(_.name).map(Expr.Str).toVector) val result = QueryContext.empty + (alias -> labelsAsExpr) - lastReportWasAMatch = true - - { - // Optimization to reduce duplicate results: - // If this isn't the first call to [[onNodeEvents]] since wake, we need to report the change. - // If this is, a call to [[readResults]] will do so for us (optimization) - if (lastReportedLabels.nonEmpty) effectHandler.reportUpdatedResults(result :: Nil) - } + + effectHandler.reportUpdatedResults(result :: Nil) true // we issued a new result - } else if (lastReportedLabels.contains(currentLabels)) { + } else if (knowSameResultReported) { // the property hasn't actually changed, so we don't need to do anything false - } else if (lastReportWasAMatch) { + } else if (lastReportWasAMatch.isEmpty || lastReportWasAMatch.contains(true)) { // !matched // we used to match but no longer do -- cancel the previous positive result - lastReportWasAMatch = false effectHandler.reportUpdatedResults(Nil) true // we issued a new result } else { @@ -753,24 +738,17 @@ final case class LabelsState(queryPartId: MultipleValuesStandingQueryPartId) ext case None => // the query only cares about the presence or absense of labels, not their values -- we only // need to send a report when we go from matching to not matching or visa versa - if (lastReportWasAMatch != matched) { + if (!lastReportWasAMatch.contains(matched)) { val resultGroup = if (matched) { // we do match, but we didn't use to -- so emit one empty (but positive!) result. - lastReportWasAMatch = true QueryContext.empty :: Nil } else { // we don't match, but we used to -- so emit that nothing matches. - lastReportWasAMatch = false Nil } - { - // Optimization to reduce duplicate results: - // If this isn't the first call to [[onNodeEvents]] since wake, we need to report the change. - // If this is, a call to [[readResults]] will do so for us (optimization) - if (lastReportedLabels.nonEmpty) effectHandler.reportUpdatedResults(resultGroup) - } + effectHandler.reportUpdatedResults(resultGroup) true } else { // nothing changed that we need to report - no-op. @@ -779,6 +757,7 @@ final case class LabelsState(queryPartId: MultipleValuesStandingQueryPartId) ext } lastReportedLabels = Some(currentLabels) + lastReportWasAMatch = Some(matched) somethingChanged } .getOrElse { @@ -786,7 +765,7 @@ final case class LabelsState(queryPartId: MultipleValuesStandingQueryPartId) ext // If this is the first call to [[onNodeEvents]] since wake, there must be no labels, so track that if (lastReportedLabels.isEmpty) { lastReportedLabels = Some(Set.empty) - lastReportWasAMatch = query.constraint(Set.empty) + lastReportWasAMatch = Some(query.constraint(Set.empty)) } // nothing changed that needs persistence false diff --git a/quine-core/src/test/scala/com/thatdot/quine/graph/standing/AllPropertiesState.scala b/quine-core/src/test/scala/com/thatdot/quine/graph/standing/AllPropertiesState.scala index a1e16002..4f6b19e9 100644 --- a/quine-core/src/test/scala/com/thatdot/quine/graph/standing/AllPropertiesState.scala +++ b/quine-core/src/test/scala/com/thatdot/quine/graph/standing/AllPropertiesState.scala @@ -30,7 +30,9 @@ class AllPropertiesStateTest extends AnyFunSuite with OptionValues { ), ), ) - assert(effects.isEmpty) + assert(effects.subscriptionsCreated.isEmpty) + assert(effects.subscriptionsCancelled.isEmpty) + assert(effects.resultsReported.nonEmpty) } } } diff --git a/quine-core/src/test/scala/com/thatdot/quine/graph/standing/LabelsStateTests.scala b/quine-core/src/test/scala/com/thatdot/quine/graph/standing/LabelsStateTests.scala index 83d0f221..812e386b 100644 --- a/quine-core/src/test/scala/com/thatdot/quine/graph/standing/LabelsStateTests.scala +++ b/quine-core/src/test/scala/com/thatdot/quine/graph/standing/LabelsStateTests.scala @@ -53,7 +53,9 @@ class LabelsStateTests extends AnyFunSuite with OptionValues { ) { (effects, initialResultOpt) => val initialResultFromThreeLabels = initialResultOpt.value assert(initialResultFromThreeLabels == Seq(QueryContext.empty)) - assert(effects.isEmpty) + assert(effects.subscriptionsCreated.isEmpty) + assert(effects.subscriptionsCancelled.isEmpty) + assert(effects.resultsReported.nonEmpty) } } @@ -68,7 +70,9 @@ class LabelsStateTests extends AnyFunSuite with OptionValues { ) { (effects, initialResultOpt) => val initialResultFromEmptyLabels = initialResultOpt.value assert(initialResultFromEmptyLabels == Seq(QueryContext.empty)) - assert(effects.isEmpty) + assert(effects.subscriptionsCreated.isEmpty) + assert(effects.subscriptionsCancelled.isEmpty) + assert(effects.resultsReported.nonEmpty) } } @@ -162,7 +166,9 @@ class LabelsStateTests extends AnyFunSuite with OptionValues { ) { (effects, initialResultOpt) => val initialResultFromThreeLabels = initialResultOpt.value assert(initialResultFromThreeLabels == Seq(makeLabelsRow(alias, Set(Symbol("A"), Symbol("B"), Symbol("C"))))) - assert(effects.isEmpty) + assert(effects.subscriptionsCreated.isEmpty) + assert(effects.subscriptionsCancelled.isEmpty) + assert(effects.resultsReported.nonEmpty) } } @@ -177,7 +183,9 @@ class LabelsStateTests extends AnyFunSuite with OptionValues { ) { (effects, initialResultOpt) => val initialResultFromEmptyLabels = initialResultOpt.value assert(initialResultFromEmptyLabels == Seq(makeLabelsRow(alias, Set.empty))) - assert(effects.isEmpty) + assert(effects.subscriptionsCreated.isEmpty) + assert(effects.subscriptionsCancelled.isEmpty) + assert(effects.resultsReported.nonEmpty) } } @@ -280,7 +288,9 @@ class LabelsStateTests extends AnyFunSuite with OptionValues { ) { (effects, initialResultOpt) => val initialResultFromThreeLabels = initialResultOpt.value assert(initialResultFromThreeLabels == Seq(QueryContext.empty)) - assert(effects.isEmpty) + assert(effects.subscriptionsCreated.isEmpty) + assert(effects.subscriptionsCancelled.isEmpty) + assert(effects.resultsReported.nonEmpty) } } @@ -295,7 +305,9 @@ class LabelsStateTests extends AnyFunSuite with OptionValues { ) { (effects, initialResultOpt) => val initialResultFromEmptyLabels = initialResultOpt.value assert(initialResultFromEmptyLabels == Seq.empty) - assert(effects.isEmpty) + assert(effects.subscriptionsCreated.isEmpty) + assert(effects.subscriptionsCancelled.isEmpty) + assert(effects.resultsReported.nonEmpty) } } @@ -397,7 +409,9 @@ class LabelsStateTests extends AnyFunSuite with OptionValues { ) { (effects, initialResultOpt) => val initialResultFromThreeLabels = initialResultOpt.value assert(initialResultFromThreeLabels == Seq(makeLabelsRow(alias, Set(Symbol("A"), Symbol("B"), Symbol("C"))))) - assert(effects.isEmpty) + assert(effects.subscriptionsCreated.isEmpty) + assert(effects.subscriptionsCancelled.isEmpty) + assert(effects.resultsReported.nonEmpty) } } @@ -412,7 +426,9 @@ class LabelsStateTests extends AnyFunSuite with OptionValues { ) { (effects, initialResultOpt) => val initialResultFromEmptyLabels = initialResultOpt.value assert(initialResultFromEmptyLabels == Seq.empty) - assert(effects.isEmpty) + assert(effects.subscriptionsCreated.isEmpty) + assert(effects.subscriptionsCancelled.isEmpty) + assert(effects.resultsReported.nonEmpty) } } @@ -526,7 +542,9 @@ class LabelsStateTests extends AnyFunSuite with OptionValues { ) { (effects, initialResultOpt) => val initialResultFromThreeLabels = initialResultOpt.value assert(initialResultFromThreeLabels == Seq(makeLabelsRow(alias, Set(Symbol("A"), Symbol("B"), Symbol("C"))))) - assert(effects.isEmpty) + assert(effects.subscriptionsCreated.isEmpty) + assert(effects.subscriptionsCancelled.isEmpty) + assert(effects.resultsReported.nonEmpty) } } @@ -541,7 +559,9 @@ class LabelsStateTests extends AnyFunSuite with OptionValues { ) { (effects, initialResultOpt) => val initialResultFromEmptyLabels = initialResultOpt.value assert(initialResultFromEmptyLabels == Seq.empty) - assert(effects.isEmpty) + assert(effects.subscriptionsCreated.isEmpty) + assert(effects.subscriptionsCancelled.isEmpty) + assert(effects.resultsReported.nonEmpty) } } diff --git a/quine-core/src/test/scala/com/thatdot/quine/graph/standing/LocalPropertyStateTests.scala b/quine-core/src/test/scala/com/thatdot/quine/graph/standing/LocalPropertyStateTests.scala index 8500a21b..8cbcb165 100644 --- a/quine-core/src/test/scala/com/thatdot/quine/graph/standing/LocalPropertyStateTests.scala +++ b/quine-core/src/test/scala/com/thatdot/quine/graph/standing/LocalPropertyStateTests.scala @@ -311,7 +311,9 @@ class LocalPropertyStateTests extends AnyFunSuite with OptionValues { tempState.initialize(Map(query.propKey -> PropertyValue(QuineValue.Integer(2L)))) { (effects, initialResultOpt) => val initialResultFromNull = initialResultOpt.value assert(initialResultFromNull == Seq()) - assert(effects.isEmpty) + assert(effects.subscriptionsCreated.isEmpty) + assert(effects.subscriptionsCancelled.isEmpty) + assert(effects.resultsReported.nonEmpty) } } @@ -320,7 +322,9 @@ class LocalPropertyStateTests extends AnyFunSuite with OptionValues { tempState.initialize(Map(query.propKey -> PropertyValue(QuineValue.Integer(1L)))) { (effects, initialResultOpt) => val initialResultFromMatch = initialResultOpt.value assert(initialResultFromMatch == Seq(QueryContext(Map(query.aliasedAs.get -> Expr.Integer(1L))))) - assert(effects.isEmpty) + assert(effects.subscriptionsCreated.isEmpty) + assert(effects.subscriptionsCancelled.isEmpty) + assert(effects.resultsReported.nonEmpty) } }