Skip to content

Commit

Permalink
novelty v2 routes
Browse files Browse the repository at this point in the history
# Description

GitOrigin-RevId: 969178620a790e619bc2f545cf0bf02096d685a6
  • Loading branch information
stevenbenjamin authored and thatbot-copy[bot] committed Sep 23, 2024
1 parent b5d1394 commit b8b88f6
Show file tree
Hide file tree
Showing 15 changed files with 200 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.webjars.WebJarAssetLocator

import com.thatdot.quine.app.config.BaseConfig
import com.thatdot.quine.app.routes.websocketquinepattern.WebSocketQuinePatternServer
import com.thatdot.quine.app.v2api.{OssApiInterface, V2OssRoutes}
import com.thatdot.quine.app.v2api.{OssApiMethods, V2OssRoutes}
import com.thatdot.quine.app.{BaseApp, BuildInfo, QuineApp}
import com.thatdot.quine.graph._
import com.thatdot.quine.gremlin.GremlinQueryRunner
Expand Down Expand Up @@ -140,7 +140,7 @@ class QuineAppRoutes(

if (apiV2Enabled) {
val v2Route = new V2OssRoutes(
new OssApiInterface(graph.asInstanceOf[GraphService], quineApp.asInstanceOf[QuineApp], config, timeout),
new OssApiMethods(graph.asInstanceOf[GraphService], quineApp.asInstanceOf[QuineApp], config, timeout),
).v2Routes
logger.warn(safe"Starting with Api V2 endpoints enabled")
v1Routes ~ v2Route
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ import org.apache.pekko.util.Timeout

import com.thatdot.quine.app.QuineApp
import com.thatdot.quine.app.config.{BaseConfig, QuineConfig}
import com.thatdot.quine.app.v2api.definitions.ApplicationApiInterface
import com.thatdot.quine.app.v2api.definitions.QuineApiMethods
import com.thatdot.quine.graph.GraphService
import com.thatdot.quine.util.Log._
class OssApiInterface(
class OssApiMethods(
val graph: GraphService,
val quineApp: QuineApp,
val app: QuineApp,
val config: BaseConfig,
val timeout: Timeout,
)(implicit val logConfig: LogConfig)
extends ApplicationApiInterface
extends QuineApiMethods
with LazySafeLogging {
val thisMemberIdx: Int = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import com.thatdot.quine.util.Log._

/** Gathering of Quine OSS tapir-defined routes.
*/
class V2OssRoutes(val app: OssApiInterface)(implicit protected val logConfig: LogConfig)
class V2OssRoutes(val appMethods: OssApiMethods)(implicit protected val logConfig: LogConfig)
extends TapirRoutes
with V2AdministrationEndpoints
with V2StandingEndpoints
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import scala.util.control.NonFatal
import scala.util.{Either, Failure, Success, Try}

import org.apache.pekko.Done
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.Sink
import org.apache.pekko.stream.{Materializer, StreamDetachedException}
import org.apache.pekko.util.Timeout

import cats.data.NonEmptyList
Expand All @@ -21,6 +21,7 @@ import io.circe.Json
import com.thatdot.quine.app.config.BaseConfig
import com.thatdot.quine.app.ingest.util.KafkaSettingsValidator
import com.thatdot.quine.app.ingest.util.KafkaSettingsValidator.ErrorString
import com.thatdot.quine.app.routes.IngestApiEntities.PauseOperationException
import com.thatdot.quine.app.routes._
import com.thatdot.quine.app.v2api.endpoints.V2AdministrationEndpointEntities.{TGraphHashCode, TQuineInfo}
import com.thatdot.quine.app.v2api.endpoints.V2AlgorithmEndpointEntities.TSaveLocation
Expand All @@ -46,20 +47,74 @@ import com.thatdot.quine.routes._
import com.thatdot.quine.util.Log.LogConfig
import com.thatdot.quine.util.SwitchMode
import com.thatdot.quine.{BuildInfo => QuineBuildInfo, model}

/** Access to application components for api endpoints */
trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods {
val graph: BaseGraph with LiteralOpsGraph with StandingQueryOpsGraph with CypherOpsGraph with AlgorithmGraph

trait ApplicationApiMethods {
val graph: BaseGraph with LiteralOpsGraph with CypherOpsGraph with StandingQueryOpsGraph
val app: BaseApp
implicit def timeout: Timeout

implicit val logConfig: LogConfig

implicit def materializer: Materializer = graph.materializer
val config: BaseConfig
def emptyConfigExample: BaseConfig

val quineApp: BaseApp with StandingQueryStore with IngestStreamState
def isReady = graph.isReady

val config: BaseConfig
def isLive = true
}
// retained functionality methods from in v1 route definitions
import com.thatdot.quine.app.routes.{AlgorithmMethods => V1AlgorithmMethods}

/** Encapsulates access to the running components of quine for individual endpoints. */
trait QuineApiMethods extends ApplicationApiMethods with V1AlgorithmMethods {

override val graph: BaseGraph with LiteralOpsGraph with StandingQueryOpsGraph with CypherOpsGraph with AlgorithmGraph
override val app: BaseApp with StandingQueryStore with IngestStreamState

// duplicated from, com.thatdot.quine.app.routes.IngestApiMethods
private def stream2Info(conf: IngestStreamWithControl[IngestStreamConfiguration]): Future[IngestStreamInfo] =
conf.status.map { status =>
IngestStreamInfo(
status,
conf.terminated().value collect { case Failure(exception) => exception.toString },
conf.settings,
conf.metrics.toEndpointResponse,
)
}(graph.shardDispatcherEC)

private def setIngestStreamPauseState(
name: String,
namespace: NamespaceId,
newState: SwitchMode,
)(implicit logConfig: LogConfig): Future[Option[IngestStreamInfoWithName]] =
app.getIngestStreamFromState(name, namespace) match {
case None => Future.successful(None)
case Some(ingest: IngestStreamWithControl[UnifiedIngestConfiguration]) =>
ingest.initialStatus match {
case IngestStreamStatus.Completed => Future.failed(PauseOperationException.Completed)
case IngestStreamStatus.Terminated => Future.failed(PauseOperationException.Terminated)
case IngestStreamStatus.Failed => Future.failed(PauseOperationException.Failed)
case _ =>
val flippedValve = ingest.valve().flatMap(_.flip(newState))(graph.nodeDispatcherEC)
val ingestStatus = flippedValve.flatMap { _ =>
// HACK: set the ingest's "initial status" to "Paused". `stream2Info` will use this as the stream status
// when the valve is closed but the stream is not terminated. However, this assignment is not threadsafe,
// and this directly violates the semantics of `initialStatus`. This should be fixed in a future refactor.
ingest.initialStatus = IngestStreamStatus.Paused
stream2Info(ingest.copy(settings = ingest.settings.asV1Config))
}(graph.nodeDispatcherEC)
ingestStatus.map(status => Some(status.withName(name)))(ExecutionContext.parasitic)
}
}

private def mkPauseOperationError[ERROR_TYPE](
operation: String,
toError: String => ERROR_TYPE,
): PartialFunction[Throwable, Either[ERROR_TYPE, Nothing]] = {
case _: StreamDetachedException =>
// A StreamDetachedException always occurs when the ingest has failed
Left(toError(s"Cannot $operation a failed ingest."))
case e: PauseOperationException =>
Left(toError(s"Cannot $operation a ${e.statusMsg} ingest."))
}

def thisMemberIdx: Int

Expand All @@ -71,18 +126,18 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods {

def getNamespaces: Future[List[String]] = Future.apply {
graph.requiredGraphIsReady()
quineApp.getNamespaces.map(namespaceToString).toList
app.getNamespaces.map(namespaceToString).toList
}(ExecutionContext.parasitic)

def createNamespace(namespace: String): Future[Boolean] =
quineApp.createNamespace(Some(Symbol(namespace)))
app.createNamespace(Some(Symbol(namespace)))

def deleteNamespace(namespace: String): Future[Boolean] =
quineApp.deleteNamespace(Some(Symbol(namespace)))
app.deleteNamespace(Some(Symbol(namespace)))

def listAllStandingQueries: Future[List[RegisteredStandingQuery]] = {
implicit val executor = ExecutionContext.parasitic
Future.sequence(quineApp.getNamespaces.map(quineApp.getStandingQueries)).map(_.toList.flatten)
Future.sequence(app.getNamespaces.map(app.getStandingQueries)).map(_.toList.flatten)
}

// --------------------- Admin Endpoints ------------------------
Expand All @@ -106,12 +161,6 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods {
)
}

def emptyConfigExample: BaseConfig

def isReady = graph.isReady

def isLive = true

def performShutdown(): Future[Unit] = graph.system.terminate().map(_ => ())(ExecutionContext.parasitic)

def metaData: Future[Map[String, String]] = {
Expand Down Expand Up @@ -139,7 +188,7 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods {

def listStandingQueries(namespaceId: NamespaceId): Future[List[RegisteredStandingQuery]] =
graph.requiredGraphIsReadyFuture {
quineApp.getStandingQueries(namespaceId)
app.getStandingQueries(namespaceId)
}

def propagateStandingQuery(
Expand Down Expand Up @@ -171,7 +220,7 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods {
Future.successful(Left(BadRequest(s"Cannot create output `$outputName`: ${errors.toList.mkString(",")}")))

case _ =>
quineApp
app
.addStandingQueryOutput(name, outputName, namespaceId, sqResultOutput)
.map {
case Some(false) => Left(BadRequest(s"There is already a standing query output named '$outputName'"))
Expand All @@ -185,7 +234,7 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods {
outputName: String,
namespaceId: NamespaceId,
): Future[Option[StandingQueryResultOutputUserDef]] = graph.requiredGraphIsReadyFuture {
quineApp.removeStandingQueryOutput(name, outputName, namespaceId)
app.removeStandingQueryOutput(name, outputName, namespaceId)
}

def createSQ(
Expand All @@ -194,7 +243,7 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods {
sq: StandingQueryDefinition,
): Future[Either[CustomError, Option[Unit]]] =
graph.requiredGraphIsReadyFuture {
try quineApp
try app
.addStandingQuery(name, namespaceId, sq)
.map {
case false => Left(BadRequest(s"There is already a standing query named '$name'"))
Expand All @@ -210,10 +259,10 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods {
}

def deleteSQ(name: String, namespaceId: NamespaceId): Future[Option[RegisteredStandingQuery]] =
quineApp.cancelStandingQuery(name, namespaceId)
app.cancelStandingQuery(name, namespaceId)

def getSQ(name: String, namespaceId: NamespaceId): Future[Option[RegisteredStandingQuery]] =
quineApp.getStandingQuery(name, namespaceId)
app.getStandingQuery(name, namespaceId)

// --------------------- Cypher Endpoints ------------------------

Expand Down Expand Up @@ -415,7 +464,9 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods {
.literalOps(namespaceId)
.getProps(qid, atTime)
.map(m =>
m.get(Symbol(propKey)).map(_.deserialized.get).map(qv => QuineValue.toJson(qv)(graph.idProvider, logConfig)),
m.get(Symbol(propKey))
.map(_.deserialized.get)
.map(qv => QuineValue.toJson(qv)(graph.idProvider, logConfig)),
)(
graph.nodeDispatcherEC,
)
Expand All @@ -429,7 +480,9 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods {
.zip(edgesF)
.map { case (props, edges) =>
TLiteralNode(
props.map { case (k, v) => k.name -> QuineValue.toJson(v.deserialized.get)(graph.idProvider, logConfig) },
props.map { case (k, v) =>
k.name -> QuineValue.toJson(v.deserialized.get)(graph.idProvider, logConfig)
},
edges.toSeq.map { case HalfEdge(t, d, o) => TRestHalfEdge(t.name, toApiEdgeDirection(d), o) },
)
}(graph.nodeDispatcherEC)
Expand Down Expand Up @@ -490,7 +543,7 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods {
settings: V2IngestConfiguration,
namespaceId: NamespaceId,
): Either[CustomError, Unit] =
quineApp.addV2IngestStream(
app.addV2IngestStream(
ingestName,
settings,
namespaceId,
Expand All @@ -509,7 +562,7 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods {
def deleteIngestStream(ingestName: String, namespaceId: NamespaceId): Future[Option[IngestStreamInfoWithName]] =
graph.requiredGraphIsReadyFuture {

quineApp.removeIngestStream(ingestName, namespaceId) match {
app.removeIngestStream(ingestName, namespaceId) match {
case None => Future.successful(None)
case Some(control @ IngestStreamWithControl(settings, metrics, _, terminated, close, _, _)) =>
val finalStatus = control.status.map { previousStatus =>
Expand Down Expand Up @@ -574,7 +627,7 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods {

def ingestStreamStatus(ingestName: String, namespaceId: NamespaceId): Future[Option[IngestStreamInfoWithName]] =
graph.requiredGraphIsReadyFuture {
quineApp.getIngestStream(ingestName, namespaceId) match {
app.getIngestStream(ingestName, namespaceId) match {
case None => Future.successful(None)
case Some(stream) => stream2Info(stream).map(s => Some(s.withName(ingestName)))(graph.shardDispatcherEC)
}
Expand All @@ -584,7 +637,7 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods {
graph.requiredGraphIsReadyFuture {
Future
.traverse(
quineApp.getIngestStreams(namespaceId).toList,
app.getIngestStreams(namespaceId).toList,
) { case (name, ingest) =>
stream2Info(ingest).map(name -> _)(graph.shardDispatcherEC)
}(implicitly, graph.shardDispatcherEC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ abstract class TapirRoutes {
/** List of endpoints that should not appear in api docs. */
protected val hiddenEndpoints: Set[ServerEndpoint[Any, Future]]

val app: ApplicationApiInterface
val appMethods: ApplicationApiMethods
private def docEndpoints: Seq[ServerEndpoint[Any, Future]] =
RedocInterpreter(redocUIOptions = RedocUIOptions.default.copy(pathPrefix = List("v2docs")))
.fromServerEndpoints[Future](apiEndpoints.filterNot(hiddenEndpoints.contains(_)), "thatdot-api-v2", "1.0.0")
Expand Down
Loading

0 comments on commit b8b88f6

Please sign in to comment.