Skip to content

Commit

Permalink
QU-1340: Restore Ingest Status
Browse files Browse the repository at this point in the history
* decide initial valveSwitchMode based on saved ingest status

* prevent pause/resume operations on ingests that are not paused or running

* sync ingest status on app shutdown instead of ingest termination

GitOrigin-RevId: 23de403b27bb11e4c94549b9c0fb5ea9957dc7fd
  • Loading branch information
nescohen authored and thatbot-copy[bot] committed Jun 16, 2023
1 parent 182a22d commit a3c2645
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,37 @@ import endpoints4s.generic.{docs, title, unnamed}

import com.thatdot.quine.routes.exts.EndpointsWithCustomErrorText

sealed abstract class IngestStreamStatus(val isTerminal: Boolean)
sealed abstract class ValvePosition(position: String)

object ValvePosition {

case object Open extends ValvePosition("Open")
case object Closed extends ValvePosition("Closed")

}

sealed abstract class IngestStreamStatus(val isTerminal: Boolean, val position: ValvePosition)

object IngestStreamStatus {
@docs("The stream is currently actively running, and possibly waiting for new records to become available upstream.")
case object Running extends IngestStreamStatus(isTerminal = false)
case object Running extends IngestStreamStatus(isTerminal = false, position = ValvePosition.Open)

@docs("The stream has been paused by a user.")
case object Paused extends IngestStreamStatus(isTerminal = false)
case object Paused extends IngestStreamStatus(isTerminal = false, position = ValvePosition.Closed)

@docs("The stream has processed all records, and the upstream data source will not make more records available.")
case object Completed extends IngestStreamStatus(isTerminal = true)
case object Completed extends IngestStreamStatus(isTerminal = true, position = ValvePosition.Closed)

@docs("The stream has been stopped by a user.")
case object Terminated extends IngestStreamStatus(isTerminal = true)
case object Terminated extends IngestStreamStatus(isTerminal = true, position = ValvePosition.Closed)

@docs(
"The stream has been restored from a saved state, but is not yet running: For example, after restarting Quine."
)
case object Restored extends IngestStreamStatus(isTerminal = false)
case object Restored extends IngestStreamStatus(isTerminal = false, position = ValvePosition.Closed)

@docs("The stream has been stopped by a failure during processing.")
case object Failed extends IngestStreamStatus(isTerminal = true)
case object Failed extends IngestStreamStatus(isTerminal = true, position = ValvePosition.Closed)

val states: Seq[IngestStreamStatus] = Seq(Running, Paused, Completed, Terminated, Restored, Failed)
}
Expand Down
54 changes: 23 additions & 31 deletions quine/src/main/scala/com/thatdot/quine/app/QuineApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -305,44 +305,51 @@ final class QuineApp(graph: GraphService)
s"Ingest stream '$name' has failed after ${metrics.millisSinceStart(now)}ms",
err
)
Try(blocking(ingestStreamsLock.synchronized {
Await.ready(syncIngestStreamsMetaData(thisMemberIdx), 5.seconds)
}))

case Success(_) =>
val now = Instant.now
metrics.stop(now)
logger.info(
s"Ingest stream '$name' successfully completed after ${metrics.millisSinceStart(now)}ms"
)
Try(blocking(ingestStreamsLock.synchronized {
Await.ready(syncIngestStreamsMetaData(thisMemberIdx), 5.seconds)
}))
}(ec)
}

def addIngestStream(
name: String,
settings: IngestStreamConfiguration,
wasRestoredFromStorage: Boolean,
// restoredStatus is None if stream was not restored at all
restoredStatus: Option[IngestStreamStatus],
shouldRestoreIngest: Boolean,
timeout: Timeout
): Try[Boolean] =
blocking(ingestStreamsLock.synchronized {
val valveSwitchMode = restoredStatus match {
case Some(restoredStatus) if shouldRestoreIngest =>
restoredStatus.position match {
case ValvePosition.Open => SwitchMode.Open
case ValvePosition.Closed => SwitchMode.Close
}
case Some(_) =>
SwitchMode.Close
case None =>
SwitchMode.Open
}

if (ingestStreams.contains(name)) {
Success(false)
} else
IngestSrcDef
.createIngestSrcDef(
name,
settings,
if (wasRestoredFromStorage) SwitchMode.Close else SwitchMode.Open
valveSwitchMode
)(graph)
.leftMap(errs => IngestStreamConfiguration.InvalidStreamConfiguration(errs))
.map { ingestSrcDef =>

val metrics = IngestMetrics(Instant.now, None, ingestSrcDef.meter)
val ingestSrc = ingestSrcDef.stream(
shouldResumeIngest = false,
registerTerminationHooks = registerTerminationHooks(name, metrics)(graph.nodeDispatcherEC)
)

Expand All @@ -354,7 +361,7 @@ final class QuineApp(graph: GraphService)
ingestSrcDef.getControl.map(_.valveHandle),
ingestSrcDef.getControl.map(_.termSignal),
ingestSrcDef.getControl.map { c => c.terminate(); () },
restored = wasRestoredFromStorage
restoredStatus = restoredStatus
)

ingestStreams += name -> streamDefWithControl
Expand Down Expand Up @@ -416,21 +423,6 @@ final class QuineApp(graph: GraphService)

/** == Utilities == */

/** If any ingest streams are currently in the `RESTORED` state, start them all. */
def startRestoredIngests(): Unit =
ingestStreams.foreach {
case (_, streamWithControl) if streamWithControl.restored =>
streamWithControl.valve
.unsafeToFuture()
.map { v =>
// In Quine App, this future is always created with `Future.success(…)`. This is ugly.
v.flip(SwitchMode.Open)
streamWithControl.restored =
false // NB: this is not actually done in a separate thread. see previous comment
}(graph.system.dispatcher)
case _ => ()
}

private def stopAllIngestStreams(): Future[Unit] =
Future
.traverse(ingestStreams: TraversableOnce[(String, IngestStreamWithControl[IngestStreamConfiguration])]) {
Expand All @@ -442,8 +434,11 @@ final class QuineApp(graph: GraphService)
.map(_ => ())(graph.system.dispatcher)

/** Prepare for a shutdown */
def shutdown(): Future[Unit] =
stopAllIngestStreams() // ... but don't update what is saved to disk
def shutdown()(implicit ec: ExecutionContext): Future[Unit] =
for {
_ <- syncIngestStreamsMetaData(thisMemberIdx)
_ <- stopAllIngestStreams() // ... but don't update what is saved to disk
} yield ()

/** Load all the state from the persistor
*
Expand Down Expand Up @@ -512,17 +507,14 @@ final class QuineApp(graph: GraphService)
}.toMap

is.foreach { case (name, ingest) =>
addIngestStream(name, ingest.config, wasRestoredFromStorage = true, timeout) match {
addIngestStream(name, ingest.config, restoredStatus = ingest.status, shouldResumeIngest, timeout) match {
case Success(true) => ()
case Success(false) =>
logger.error(s"Duplicate ingest stream attempted to start with name: $name and settings: ${ingest.config}")
case Failure(e) =>
logger.error(s"Error when restoring ingest stream: $name with settings: ${ingest.config}", e)
}
}
if (shouldResumeIngest) {
startRestoredIngests()
}
}(graph.system.dispatcher)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ object RecipeInterpreter {
appState.addIngestStream(
ingestStreamName,
ingestStream,
wasRestoredFromStorage = false,
restoredStatus = None,
shouldRestoreIngest = true,
timeout = 5 seconds
) match {
case Failure(ex) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,8 @@ abstract class IngestSrcDef(

/** Assembled stream definition.
*/
def stream(): Source[IngestSrcExecToken, NotUsed] = stream(shouldResumeIngest = true, _ => ())
def stream(): Source[IngestSrcExecToken, NotUsed] = stream(_ => ())
def stream(
shouldResumeIngest: Boolean,
registerTerminationHooks: Future[Done] => Unit
): Source[IngestSrcExecToken, NotUsed] =
RestartSource.onFailuresWithBackoff(restartSettings) { () =>
Expand All @@ -134,7 +133,7 @@ abstract class IngestSrcDef(
.watchTermination() { case ((a: ShutdownSwitch, b: Future[ValveSwitch]), c: Future[Done]) =>
b.map(v => ControlSwitches(a, v, c))(ExecutionContexts.parasitic)
}
.mapMaterializedValue(c => setControl(c, shouldResumeIngest, registerTerminationHooks))
.mapMaterializedValue(c => setControl(c, initialSwitchMode, registerTerminationHooks))
.named(name)
}

Expand All @@ -143,12 +142,12 @@ abstract class IngestSrcDef(

private def setControl(
control: Future[QuineAppIngestControl],
shouldResumeIngest: Boolean,
desiredSwitchMode: SwitchMode,
registerTerminationHooks: Future[Done] => Unit
): Unit = {

// Ensure valve is opened if required and termination hooks are registered
if (shouldResumeIngest) control.flatMap(c => c.valveHandle.flip(SwitchMode.Open))(graph.nodeDispatcherEC)
control.foreach(c => c.valveHandle.flip(desiredSwitchMode))(graph.nodeDispatcherEC)
control.map(c => registerTerminationHooks(c.termSignal))(graph.nodeDispatcherEC)

// Set the appropriate ref and deferred ingest control
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.thatdot.quine.app.routes
import java.time.Instant

import scala.compat.ExecutionContexts
import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}

import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
Expand All @@ -20,7 +20,7 @@ import com.thatdot.quine.routes._
import com.thatdot.quine.{BuildInfo => QuineBuildInfo}

trait AdministrationRoutesState {
def shutdown(): Future[Unit]
def shutdown()(implicit ec: ExecutionContext): Future[Unit]
}

/** The Akka HTTP implementation of [[AdministrationRoutes]] */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import java.time.temporal.ChronoUnit.MILLIS
import scala.compat.ExecutionContexts
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.control.NoStackTrace
import scala.util.{Failure, Success, Try}

import akka.http.scaladsl.server.Directives._
Expand All @@ -19,6 +20,7 @@ import cats.effect.IO
import cats.effect.unsafe.implicits.global
import com.codahale.metrics.Metered
import com.typesafe.scalalogging.LazyLogging
import endpoints4s.Invalid
import io.circe.Json

import com.thatdot.quine.app.ingest.util.KafkaSettingsValidator
Expand All @@ -31,7 +33,8 @@ trait IngestStreamState {
def addIngestStream(
name: String,
settings: IngestStreamConfiguration,
wasRestoredFromStorage: Boolean,
restoredStatus: Option[IngestStreamStatus],
shouldRestoreIngest: Boolean,
timeout: Timeout
): Try[Boolean]

Expand Down Expand Up @@ -66,8 +69,8 @@ final private[thatdot] case class IngestStreamWithControl[+Conf](
valve: IO[ValveSwitch],
terminated: IO[Future[Done]],
close: IO[Unit],
var optWs: Option[(Sink[Json, NotUsed], IngestMeter)] = None,
var restored: Boolean = false
var restoredStatus: Option[IngestStreamStatus] = None,
var optWs: Option[(Sink[Json, NotUsed], IngestMeter)] = None
) extends LazyLogging {

// Returns a simpler version of status. Only possible values are completed, failed, or running
Expand Down Expand Up @@ -99,8 +102,7 @@ final private[thatdot] case class IngestStreamWithControl[+Conf](
.getMode()
.map {
case SwitchMode.Open => IngestStreamStatus.Running
case SwitchMode.Close if restored => IngestStreamStatus.Restored
case SwitchMode.Close => IngestStreamStatus.Paused
case SwitchMode.Close => restoredStatus getOrElse IngestStreamStatus.Paused
}(materializer.executionContext)
)
materializer.system.scheduler.scheduleOnce(1.second) {
Expand Down Expand Up @@ -193,7 +195,7 @@ trait IngestRoutesImpl
/** Try to register a new ingest stream */
private val ingestStreamStartRoute = {
def addSettings(name: String, settings: IngestStreamConfiguration) =
serviceState.addIngestStream(name, settings, wasRestoredFromStorage = false, timeout) match {
serviceState.addIngestStream(name, settings, None, shouldRestoreIngest = false, timeout) match {
case Success(false) =>
Left(
endpoints4s.Invalid(
Expand Down Expand Up @@ -285,35 +287,52 @@ trait IngestRoutesImpl
.map(_.toMap)(graph.shardDispatcherEC)
}

sealed private case class PauseOperationException(statusMsg: String) extends Exception with NoStackTrace

private object PauseOperationException {
object Completed extends PauseOperationException("completed")
object Terminated extends PauseOperationException("terminated")
object Failed extends PauseOperationException("failed")
}
private[this] def setIngestStreamPauseState(
name: String,
newState: SwitchMode
): Future[Option[IngestStreamInfoWithName]] =
serviceState.getIngestStream(name) match {
case None => Future.successful(None)
case Some(ingest: IngestStreamWithControl[IngestStreamConfiguration]) =>
val flippedValve = ingest.valve.unsafeToFuture().flatMap(_.flip(newState))(graph.nodeDispatcherEC)
val ingestStatus = flippedValve.flatMap { _ =>
ingest.restored = false; // FIXME not threadsafe
stream2Info(ingest)
}(graph.nodeDispatcherEC)
ingestStatus.map(status => Some(status.withName(name)))(ExecutionContexts.parasitic)
ingest.restoredStatus match {
case Some(IngestStreamStatus.Completed) => Future.failed(PauseOperationException.Completed)
case Some(IngestStreamStatus.Terminated) => Future.failed(PauseOperationException.Terminated)
case Some(IngestStreamStatus.Failed) => Future.failed(PauseOperationException.Failed)
case _ =>
val flippedValve = ingest.valve.unsafeToFuture().flatMap(_.flip(newState))(graph.nodeDispatcherEC)
val ingestStatus = flippedValve.flatMap { _ =>
ingest.restoredStatus = None; // FIXME not threadsafe
stream2Info(ingest)
}(graph.nodeDispatcherEC)
ingestStatus.map(status => Some(status.withName(name)))(ExecutionContexts.parasitic)
}
}

private def mkPauseOperationError(operation: String): PartialFunction[Throwable, Either[Invalid, Nothing]] = {
case _: StreamDetachedException =>
// A StreamDetachedException always occurs when the ingest has failed
Left(endpoints4s.Invalid(s"Cannot ${operation} a failed ingest."))
case e: PauseOperationException =>
Left(endpoints4s.Invalid(s"Cannot ${operation} a ${e.statusMsg} ingest."))
}

private val ingestStreamPauseRoute = ingestStreamPause.implementedByAsync { (name: String) =>
setIngestStreamPauseState(name, SwitchMode.Close)
.map(Right(_))(ExecutionContexts.parasitic)
.recover { case _: StreamDetachedException =>
Left(endpoints4s.Invalid("Cannot pause a failed ingest."))
}(ExecutionContexts.parasitic)
.recover(mkPauseOperationError("pause"))(ExecutionContexts.parasitic)
}

private val ingestStreamUnpauseRoute = ingestStreamUnpause.implementedByAsync { (name: String) =>
setIngestStreamPauseState(name, SwitchMode.Open)
.map(Right(_))(ExecutionContexts.parasitic)
.recover { case _: StreamDetachedException =>
Left(endpoints4s.Invalid("Cannot resume a failed ingest."))
}(ExecutionContexts.parasitic)
.recover(mkPauseOperationError("resume"))(ExecutionContexts.parasitic)
}

final val ingestRoutes: Route = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.thatdot.quine.graph

import java.io.{File, PrintStream}

import scala.compat.ExecutionContexts
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext}
import scala.io.{BufferedSource, Source}
Expand Down Expand Up @@ -29,7 +30,7 @@ class StandingQueryTest extends AnyFunSuite with BeforeAndAfterAll with Matchers
val quineApp = new QuineApp(graph)
override def afterAll: Unit = {
Await.result(graph.shutdown(), 1.second)
Await.result(quineApp.shutdown(), 1.second)
Await.result(quineApp.shutdown()(ExecutionContexts.parasitic), 1.second)
}

/** Run a recipe and return a file that can be examined for output */
Expand Down

0 comments on commit a3c2645

Please sign in to comment.