diff --git a/quine/src/main/scala/com/thatdot/quine/app/routes/QuineAppRoutes.scala b/quine/src/main/scala/com/thatdot/quine/app/routes/QuineAppRoutes.scala index 28e8d179..de16802a 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/routes/QuineAppRoutes.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/routes/QuineAppRoutes.scala @@ -13,7 +13,7 @@ import org.webjars.WebJarAssetLocator import com.thatdot.quine.app.config.BaseConfig import com.thatdot.quine.app.routes.websocketquinepattern.WebSocketQuinePatternServer -import com.thatdot.quine.app.v2api.{OssApiInterface, V2OssRoutes} +import com.thatdot.quine.app.v2api.{OssApiMethods, V2OssRoutes} import com.thatdot.quine.app.{BaseApp, BuildInfo, QuineApp} import com.thatdot.quine.graph._ import com.thatdot.quine.gremlin.GremlinQueryRunner @@ -140,7 +140,7 @@ class QuineAppRoutes( if (apiV2Enabled) { val v2Route = new V2OssRoutes( - new OssApiInterface(graph.asInstanceOf[GraphService], quineApp.asInstanceOf[QuineApp], config, timeout), + new OssApiMethods(graph.asInstanceOf[GraphService], quineApp.asInstanceOf[QuineApp], config, timeout), ).v2Routes logger.warn(safe"Starting with Api V2 endpoints enabled") v1Routes ~ v2Route diff --git a/quine/src/main/scala/com/thatdot/quine/app/v2api/OssApiInterface.scala b/quine/src/main/scala/com/thatdot/quine/app/v2api/OssApiMethods.scala similarity index 75% rename from quine/src/main/scala/com/thatdot/quine/app/v2api/OssApiInterface.scala rename to quine/src/main/scala/com/thatdot/quine/app/v2api/OssApiMethods.scala index dfabd31b..25fd3420 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/v2api/OssApiInterface.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/v2api/OssApiMethods.scala @@ -4,16 +4,16 @@ import org.apache.pekko.util.Timeout import com.thatdot.quine.app.QuineApp import com.thatdot.quine.app.config.{BaseConfig, QuineConfig} -import com.thatdot.quine.app.v2api.definitions.ApplicationApiInterface +import com.thatdot.quine.app.v2api.definitions.QuineApiMethods import com.thatdot.quine.graph.GraphService import com.thatdot.quine.util.Log._ -class OssApiInterface( +class OssApiMethods( val graph: GraphService, - val quineApp: QuineApp, + val app: QuineApp, val config: BaseConfig, val timeout: Timeout, )(implicit val logConfig: LogConfig) - extends ApplicationApiInterface + extends QuineApiMethods with LazySafeLogging { val thisMemberIdx: Int = 0 diff --git a/quine/src/main/scala/com/thatdot/quine/app/v2api/V2OssRoutes.scala b/quine/src/main/scala/com/thatdot/quine/app/v2api/V2OssRoutes.scala index 1642ef4d..d0e9d5e6 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/v2api/V2OssRoutes.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/v2api/V2OssRoutes.scala @@ -18,7 +18,7 @@ import com.thatdot.quine.util.Log._ /** Gathering of Quine OSS tapir-defined routes. */ -class V2OssRoutes(val app: OssApiInterface)(implicit protected val logConfig: LogConfig) +class V2OssRoutes(val appMethods: OssApiMethods)(implicit protected val logConfig: LogConfig) extends TapirRoutes with V2AdministrationEndpoints with V2StandingEndpoints diff --git a/quine/src/main/scala/com/thatdot/quine/app/v2api/definitions/ApplicationApiInterface.scala b/quine/src/main/scala/com/thatdot/quine/app/v2api/definitions/QuineApiMethods.scala similarity index 83% rename from quine/src/main/scala/com/thatdot/quine/app/v2api/definitions/ApplicationApiInterface.scala rename to quine/src/main/scala/com/thatdot/quine/app/v2api/definitions/QuineApiMethods.scala index 8c793fd0..a338a060 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/v2api/definitions/ApplicationApiInterface.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/v2api/definitions/QuineApiMethods.scala @@ -10,8 +10,8 @@ import scala.util.control.NonFatal import scala.util.{Either, Failure, Success, Try} import org.apache.pekko.Done -import org.apache.pekko.stream.Materializer import org.apache.pekko.stream.scaladsl.Sink +import org.apache.pekko.stream.{Materializer, StreamDetachedException} import org.apache.pekko.util.Timeout import cats.data.NonEmptyList @@ -21,6 +21,7 @@ import io.circe.Json import com.thatdot.quine.app.config.BaseConfig import com.thatdot.quine.app.ingest.util.KafkaSettingsValidator import com.thatdot.quine.app.ingest.util.KafkaSettingsValidator.ErrorString +import com.thatdot.quine.app.routes.IngestApiEntities.PauseOperationException import com.thatdot.quine.app.routes._ import com.thatdot.quine.app.v2api.endpoints.V2AdministrationEndpointEntities.{TGraphHashCode, TQuineInfo} import com.thatdot.quine.app.v2api.endpoints.V2AlgorithmEndpointEntities.TSaveLocation @@ -46,20 +47,74 @@ import com.thatdot.quine.routes._ import com.thatdot.quine.util.Log.LogConfig import com.thatdot.quine.util.SwitchMode import com.thatdot.quine.{BuildInfo => QuineBuildInfo, model} - -/** Access to application components for api endpoints */ -trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods { - val graph: BaseGraph with LiteralOpsGraph with StandingQueryOpsGraph with CypherOpsGraph with AlgorithmGraph - +trait ApplicationApiMethods { + val graph: BaseGraph with LiteralOpsGraph with CypherOpsGraph with StandingQueryOpsGraph + val app: BaseApp implicit def timeout: Timeout - implicit val logConfig: LogConfig - implicit def materializer: Materializer = graph.materializer + val config: BaseConfig + def emptyConfigExample: BaseConfig - val quineApp: BaseApp with StandingQueryStore with IngestStreamState + def isReady = graph.isReady - val config: BaseConfig + def isLive = true +} +// retained functionality methods from in v1 route definitions +import com.thatdot.quine.app.routes.{AlgorithmMethods => V1AlgorithmMethods} + +/** Encapsulates access to the running components of quine for individual endpoints. */ +trait QuineApiMethods extends ApplicationApiMethods with V1AlgorithmMethods { + + override val graph: BaseGraph with LiteralOpsGraph with StandingQueryOpsGraph with CypherOpsGraph with AlgorithmGraph + override val app: BaseApp with StandingQueryStore with IngestStreamState + + // duplicated from, com.thatdot.quine.app.routes.IngestApiMethods + private def stream2Info(conf: IngestStreamWithControl[IngestStreamConfiguration]): Future[IngestStreamInfo] = + conf.status.map { status => + IngestStreamInfo( + status, + conf.terminated().value collect { case Failure(exception) => exception.toString }, + conf.settings, + conf.metrics.toEndpointResponse, + ) + }(graph.shardDispatcherEC) + + private def setIngestStreamPauseState( + name: String, + namespace: NamespaceId, + newState: SwitchMode, + )(implicit logConfig: LogConfig): Future[Option[IngestStreamInfoWithName]] = + app.getIngestStreamFromState(name, namespace) match { + case None => Future.successful(None) + case Some(ingest: IngestStreamWithControl[UnifiedIngestConfiguration]) => + ingest.initialStatus match { + case IngestStreamStatus.Completed => Future.failed(PauseOperationException.Completed) + case IngestStreamStatus.Terminated => Future.failed(PauseOperationException.Terminated) + case IngestStreamStatus.Failed => Future.failed(PauseOperationException.Failed) + case _ => + val flippedValve = ingest.valve().flatMap(_.flip(newState))(graph.nodeDispatcherEC) + val ingestStatus = flippedValve.flatMap { _ => + // HACK: set the ingest's "initial status" to "Paused". `stream2Info` will use this as the stream status + // when the valve is closed but the stream is not terminated. However, this assignment is not threadsafe, + // and this directly violates the semantics of `initialStatus`. This should be fixed in a future refactor. + ingest.initialStatus = IngestStreamStatus.Paused + stream2Info(ingest.copy(settings = ingest.settings.asV1Config)) + }(graph.nodeDispatcherEC) + ingestStatus.map(status => Some(status.withName(name)))(ExecutionContext.parasitic) + } + } + + private def mkPauseOperationError[ERROR_TYPE]( + operation: String, + toError: String => ERROR_TYPE, + ): PartialFunction[Throwable, Either[ERROR_TYPE, Nothing]] = { + case _: StreamDetachedException => + // A StreamDetachedException always occurs when the ingest has failed + Left(toError(s"Cannot $operation a failed ingest.")) + case e: PauseOperationException => + Left(toError(s"Cannot $operation a ${e.statusMsg} ingest.")) + } def thisMemberIdx: Int @@ -71,18 +126,18 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods { def getNamespaces: Future[List[String]] = Future.apply { graph.requiredGraphIsReady() - quineApp.getNamespaces.map(namespaceToString).toList + app.getNamespaces.map(namespaceToString).toList }(ExecutionContext.parasitic) def createNamespace(namespace: String): Future[Boolean] = - quineApp.createNamespace(Some(Symbol(namespace))) + app.createNamespace(Some(Symbol(namespace))) def deleteNamespace(namespace: String): Future[Boolean] = - quineApp.deleteNamespace(Some(Symbol(namespace))) + app.deleteNamespace(Some(Symbol(namespace))) def listAllStandingQueries: Future[List[RegisteredStandingQuery]] = { implicit val executor = ExecutionContext.parasitic - Future.sequence(quineApp.getNamespaces.map(quineApp.getStandingQueries)).map(_.toList.flatten) + Future.sequence(app.getNamespaces.map(app.getStandingQueries)).map(_.toList.flatten) } // --------------------- Admin Endpoints ------------------------ @@ -106,12 +161,6 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods { ) } - def emptyConfigExample: BaseConfig - - def isReady = graph.isReady - - def isLive = true - def performShutdown(): Future[Unit] = graph.system.terminate().map(_ => ())(ExecutionContext.parasitic) def metaData: Future[Map[String, String]] = { @@ -139,7 +188,7 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods { def listStandingQueries(namespaceId: NamespaceId): Future[List[RegisteredStandingQuery]] = graph.requiredGraphIsReadyFuture { - quineApp.getStandingQueries(namespaceId) + app.getStandingQueries(namespaceId) } def propagateStandingQuery( @@ -171,7 +220,7 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods { Future.successful(Left(BadRequest(s"Cannot create output `$outputName`: ${errors.toList.mkString(",")}"))) case _ => - quineApp + app .addStandingQueryOutput(name, outputName, namespaceId, sqResultOutput) .map { case Some(false) => Left(BadRequest(s"There is already a standing query output named '$outputName'")) @@ -185,7 +234,7 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods { outputName: String, namespaceId: NamespaceId, ): Future[Option[StandingQueryResultOutputUserDef]] = graph.requiredGraphIsReadyFuture { - quineApp.removeStandingQueryOutput(name, outputName, namespaceId) + app.removeStandingQueryOutput(name, outputName, namespaceId) } def createSQ( @@ -194,7 +243,7 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods { sq: StandingQueryDefinition, ): Future[Either[CustomError, Option[Unit]]] = graph.requiredGraphIsReadyFuture { - try quineApp + try app .addStandingQuery(name, namespaceId, sq) .map { case false => Left(BadRequest(s"There is already a standing query named '$name'")) @@ -210,10 +259,10 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods { } def deleteSQ(name: String, namespaceId: NamespaceId): Future[Option[RegisteredStandingQuery]] = - quineApp.cancelStandingQuery(name, namespaceId) + app.cancelStandingQuery(name, namespaceId) def getSQ(name: String, namespaceId: NamespaceId): Future[Option[RegisteredStandingQuery]] = - quineApp.getStandingQuery(name, namespaceId) + app.getStandingQuery(name, namespaceId) // --------------------- Cypher Endpoints ------------------------ @@ -415,7 +464,9 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods { .literalOps(namespaceId) .getProps(qid, atTime) .map(m => - m.get(Symbol(propKey)).map(_.deserialized.get).map(qv => QuineValue.toJson(qv)(graph.idProvider, logConfig)), + m.get(Symbol(propKey)) + .map(_.deserialized.get) + .map(qv => QuineValue.toJson(qv)(graph.idProvider, logConfig)), )( graph.nodeDispatcherEC, ) @@ -429,7 +480,9 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods { .zip(edgesF) .map { case (props, edges) => TLiteralNode( - props.map { case (k, v) => k.name -> QuineValue.toJson(v.deserialized.get)(graph.idProvider, logConfig) }, + props.map { case (k, v) => + k.name -> QuineValue.toJson(v.deserialized.get)(graph.idProvider, logConfig) + }, edges.toSeq.map { case HalfEdge(t, d, o) => TRestHalfEdge(t.name, toApiEdgeDirection(d), o) }, ) }(graph.nodeDispatcherEC) @@ -490,7 +543,7 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods { settings: V2IngestConfiguration, namespaceId: NamespaceId, ): Either[CustomError, Unit] = - quineApp.addV2IngestStream( + app.addV2IngestStream( ingestName, settings, namespaceId, @@ -509,7 +562,7 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods { def deleteIngestStream(ingestName: String, namespaceId: NamespaceId): Future[Option[IngestStreamInfoWithName]] = graph.requiredGraphIsReadyFuture { - quineApp.removeIngestStream(ingestName, namespaceId) match { + app.removeIngestStream(ingestName, namespaceId) match { case None => Future.successful(None) case Some(control @ IngestStreamWithControl(settings, metrics, _, terminated, close, _, _)) => val finalStatus = control.status.map { previousStatus => @@ -574,7 +627,7 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods { def ingestStreamStatus(ingestName: String, namespaceId: NamespaceId): Future[Option[IngestStreamInfoWithName]] = graph.requiredGraphIsReadyFuture { - quineApp.getIngestStream(ingestName, namespaceId) match { + app.getIngestStream(ingestName, namespaceId) match { case None => Future.successful(None) case Some(stream) => stream2Info(stream).map(s => Some(s.withName(ingestName)))(graph.shardDispatcherEC) } @@ -584,7 +637,7 @@ trait ApplicationApiInterface extends AlgorithmMethods with IngestApiMethods { graph.requiredGraphIsReadyFuture { Future .traverse( - quineApp.getIngestStreams(namespaceId).toList, + app.getIngestStreams(namespaceId).toList, ) { case (name, ingest) => stream2Info(ingest).map(name -> _)(graph.shardDispatcherEC) }(implicitly, graph.shardDispatcherEC) diff --git a/quine/src/main/scala/com/thatdot/quine/app/v2api/definitions/TapirRoutes.scala b/quine/src/main/scala/com/thatdot/quine/app/v2api/definitions/TapirRoutes.scala index 59bdd499..ceded90a 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/v2api/definitions/TapirRoutes.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/v2api/definitions/TapirRoutes.scala @@ -20,7 +20,7 @@ abstract class TapirRoutes { /** List of endpoints that should not appear in api docs. */ protected val hiddenEndpoints: Set[ServerEndpoint[Any, Future]] - val app: ApplicationApiInterface + val appMethods: ApplicationApiMethods private def docEndpoints: Seq[ServerEndpoint[Any, Future]] = RedocInterpreter(redocUIOptions = RedocUIOptions.default.copy(pathPrefix = List("v2docs"))) .fromServerEndpoints[Future](apiEndpoints.filterNot(hiddenEndpoints.contains(_)), "thatdot-api-v2", "1.0.0") diff --git a/quine/src/main/scala/com/thatdot/quine/app/v2api/definitions/V2EndpointDefinitions.scala b/quine/src/main/scala/com/thatdot/quine/app/v2api/definitions/V2QuineEndpointDefinitions.scala similarity index 93% rename from quine/src/main/scala/com/thatdot/quine/app/v2api/definitions/V2EndpointDefinitions.scala rename to quine/src/main/scala/com/thatdot/quine/app/v2api/definitions/V2QuineEndpointDefinitions.scala index 378c99ac..6c390063 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/v2api/definitions/V2EndpointDefinitions.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/v2api/definitions/V2QuineEndpointDefinitions.scala @@ -3,7 +3,6 @@ package com.thatdot.quine.app.v2api.definitions import java.nio.charset.{Charset, StandardCharsets} import java.util.concurrent.TimeUnit -import scala.annotation.unused import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} @@ -15,7 +14,7 @@ import sttp.tapir.CodecFormat.TextPlain import sttp.tapir.DecodeResult.Value import sttp.tapir.generic.auto.schemaForCaseClass import sttp.tapir.json.circe.TapirJsonCirce -import sttp.tapir.{EndpointOutput, _} +import sttp.tapir.{EndpointOutput, endpoint, _} import com.thatdot.quine.app.v2api.definitions.CustomError.toCustomError import com.thatdot.quine.graph.NamespaceId @@ -32,6 +31,7 @@ trait V2EndpointDefinitions extends TapirJsonCirce with LazySafeLogging { implicit protected def logConfig: LogConfig + val appMethods: ApplicationApiMethods // ------- parallelism ----------- val parallelismParameter: EndpointInput.Query[Option[Int]] = query[Option[Int]](name = "parallelism") .description(s"Operations to execute simultaneously. Default: `${IngestRoutes.defaultWriteParallelism}`") @@ -65,7 +65,8 @@ trait V2EndpointDefinitions extends TapirJsonCirce with LazySafeLogging { } //TODO Use Tapir Validator IdProvider.validate - val idProvider: QuineIdProvider = app.graph.idProvider + + val idProvider: QuineIdProvider = appMethods.graph.idProvider implicit val quineIdCodec: Codec[String, QuineId, TextPlain] = Codec.string.mapDecode(toQuineId)(idProvider.qidToPrettyString) @@ -82,28 +83,10 @@ trait V2EndpointDefinitions extends TapirJsonCirce with LazySafeLogging { type EndpointOutput[T] = Either[ErrorEnvelope[_ <: CustomError], ObjectEnvelope[T]] - /** OSS Specific behavior defined in [[com.thatdot.quine.v2api.V2OssRoutes]]. */ - def memberIdxParameter: EndpointInput[Option[Int]] - - /** OSS Specific behavior defined in [[V2OssRoutes]]. */ - def namespaceParameter: EndpointInput[Option[String]] - - //TODO port logic from QuineEndpoints NamespaceParameter - def namespaceFromParam(ns: Option[String]): NamespaceId = - ns.flatMap(t => Option.when(t != "default")(Symbol(t))) - - def ifNamespaceFound[A](namespaceId: NamespaceId)( - ifFound: => Future[Either[CustomError, A]], - ): Future[Either[CustomError, Option[A]]] = - if (!app.graph.getNamespaces.contains(namespaceId)) Future.successful(Right(None)) - else ifFound.map(_.map(Some(_)))(ExecutionContext.parasitic) - - val app: ApplicationApiInterface - /** Matching error types to api-rendered error outputs. Api Status codes are returned by matching the type of the * custom error to the status code matched here */ - private val commonErrorOutput + protected val commonErrorOutput : EndpointOutput.OneOf[ErrorEnvelope[_ <: CustomError], ErrorEnvelope[_ <: CustomError]] = oneOf[ErrorEnvelope[_ <: CustomError]]( oneOfVariantFromMatchType( @@ -128,11 +111,54 @@ trait V2EndpointDefinitions extends TapirJsonCirce with LazySafeLogging { ), ) - private def toObjectEnvelopeEncoder[T](encoder: Encoder[T]): Encoder[ObjectEnvelope[T]] = (a: ObjectEnvelope[T]) => + protected def toObjectEnvelopeEncoder[T](encoder: Encoder[T]): Encoder[ObjectEnvelope[T]] = (a: ObjectEnvelope[T]) => Json.fromFields(Seq(("data", encoder.apply(a.data)))) - private def toObjectEnvelopeDecoder[T](decoder: Decoder[T]): Decoder[ObjectEnvelope[T]] = + + protected def toObjectEnvelopeDecoder[T](decoder: Decoder[T]): Decoder[ObjectEnvelope[T]] = c => decoder.apply(c.downField("data").root).map(ObjectEnvelope(_)) + def yamlBody[T]()(implicit + schema: Schema[T], + encoder: Encoder[T], + decoder: Decoder[T], + ): EndpointIO.Body[String, T] = stringBodyAnyFormat(YamlCodec.createCodec[T](), StandardCharsets.UTF_8) + + def jsonOrYamlBody[T](implicit + schema: Schema[T], + encoder: Encoder[T], + decoder: Decoder[T], + ): EndpointIO.OneOfBody[T, T] = + oneOfBody[T](jsonBody[T], yamlBody[T]()) + + def textBody[T](codec: Codec[String, T, TextPlain]): EndpointIO.Body[String, T] = + stringBodyAnyFormat(codec, Charset.defaultCharset()) + + /** Wrap output types in their corresponding envelopes. */ + protected def wrapOutput[OUT](value: Either[CustomError, OUT]): EndpointOutput[OUT] = + value.fold(t => Left(ErrorEnvelope(t)), v => Right(ObjectEnvelope(v))) +} + +/** Component definitions for Tapir quine endpoints. */ +trait V2QuineEndpointDefinitions extends V2EndpointDefinitions { + + val appMethods: QuineApiMethods + + /** OSS Specific behavior defined in [[com.thatdot.quine.v2api.V2OssRoutes]]. */ + def memberIdxParameter: EndpointInput[Option[Int]] + + /** OSS Specific behavior defined in [[V2OssRoutes]]. */ + def namespaceParameter: EndpointInput[Option[String]] + + //TODO port logic from QuineEndpoints NamespaceParameter + def namespaceFromParam(ns: Option[String]): NamespaceId = + ns.flatMap(t => Option.when(t != "default")(Symbol(t))) + + def ifNamespaceFound[A](namespaceId: NamespaceId)( + ifFound: => Future[Either[CustomError, A]], + ): Future[Either[CustomError, Option[A]]] = + if (!appMethods.graph.getNamespaces.contains(namespaceId)) Future.successful(Right(None)) + else ifFound.map(_.map(Some(_)))(ExecutionContext.parasitic) + type EndpointBase = Endpoint[Unit, Option[Int], Unit, Unit, Any] /** Base for api/v2 endpoints with common errors that expects the universal parameter memberIdx. @@ -170,23 +196,6 @@ trait V2EndpointDefinitions extends TapirJsonCirce with LazySafeLogging { ): Endpoint[Unit, Option[Int], ErrorEnvelope[_ <: CustomError], ObjectEnvelope[T], Any] = withOutput[T](rawEndpoint(basePaths: _*)) - def yamlBody[T]()(implicit - schema: Schema[T], - encoder: Encoder[T], - decoder: Decoder[T], - ): EndpointIO.Body[String, T] = stringBodyAnyFormat(YamlCodec.createCodec[T](), StandardCharsets.UTF_8) - - @unused - def jsonOrYamlBody[T](implicit - schema: Schema[T], - encoder: Encoder[T], - decoder: Decoder[T], - ): EndpointIO.OneOfBody[T, T] = - oneOfBody[T](jsonBody[T], yamlBody[T]()) - - def textBody[T](codec: Codec[String, T, TextPlain]): EndpointIO.Body[String, T] = - stringBodyAnyFormat(codec, Charset.defaultCharset()) - //TODO split into definitions in extensions of [[TapirRoutes]] that support specific platforms. /** Wrap server responses in their respective output envelopes. */ def runServerLogic[IN, OUT: Decoder]( @@ -214,7 +223,4 @@ trait V2EndpointDefinitions extends TapirJsonCirce with LazySafeLogging { f(in).map(wrapOutput).recover(t => Left(ErrorEnvelope(toCustomError(t)))) } - /** Wrap output types in their corresponding envelopes. */ - protected def wrapOutput[OUT](value: Either[CustomError, OUT]): EndpointOutput[OUT] = - value.fold(t => Left(ErrorEnvelope(t)), v => Right(ObjectEnvelope(v))) } diff --git a/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2AdministrationEndpoints.scala b/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2AdministrationEndpoints.scala index 42c9943e..41d7d1ce 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2AdministrationEndpoints.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2AdministrationEndpoints.scala @@ -125,7 +125,7 @@ object V2AdministrationEndpointEntities { } -trait V2AdministrationEndpoints extends V2EndpointDefinitions { +trait V2AdministrationEndpoints extends V2QuineEndpointDefinitions { implicit lazy val graphHashCodeSchema: Schema[TGraphHashCode] = Schema.derived[TGraphHashCode].description("Graph Hash Code").encodedExample(TGraphHashCode(1000L, 12345L).asJson) @@ -175,12 +175,14 @@ trait V2AdministrationEndpoints extends V2EndpointDefinitions { .description("Returns a JSON object containing information about how Quine was built") .get .serverLogic { memberIdx => - runServerLogic[Unit, TQuineInfo](GetBuildInfoApiCmd, memberIdx, (), _ => Future.successful(app.buildInfo)) + runServerLogic[Unit, TQuineInfo](GetBuildInfoApiCmd, memberIdx, (), _ => Future.successful(appMethods.buildInfo)) } private val configEndpoint = { implicit val configSchema: Schema[ObjectEnvelope[Json]] = - Schema.derived[ObjectEnvelope[Json]].encodedExample(Json.obj(("data", app.emptyConfigExample.loadedConfigJson))) + Schema + .derived[ObjectEnvelope[Json]] + .encodedExample(Json.obj(("data", appMethods.emptyConfigExample.loadedConfigJson))) adminEndpoint[Json]("config") .name("Running Configuration") @@ -194,7 +196,12 @@ server request timeout of this REST API, but it won't show up in the response of endpoint. """).get .serverLogic { memberIdx => - runServerLogic[Unit, Json](GetConfigApiCmd, memberIdx, (), _ => Future.successful(app.config.loadedConfigJson)) + runServerLogic[Unit, Json]( + GetConfigApiCmd, + memberIdx, + (), + _ => Future.successful(appMethods.config.loadedConfigJson), + ) } } @@ -220,7 +227,7 @@ endpoint. GraphHashCodeApiCmd, memberIdx, (atime, namespaceFromParam(ns)), - t => app.graphHashCode(t._1, t._2), + t => appMethods.graphHashCode(t._1, t._2), ) } @@ -246,7 +253,7 @@ up ready and start routing user requests to it. GetReadinessApiCmd, memberIdx, (), - _ => Future.successful(Either.cond(app.isReady, true, ServiceUnavailable("System is not ready"))), + _ => Future.successful(Either.cond(appMethods.isReady, true, ServiceUnavailable("System is not ready"))), ) } @@ -259,7 +266,7 @@ up ready and start routing user requests to it. ShutdownApiCmd, memberIdx, (), - _ => app.performShutdown(), + _ => appMethods.performShutdown(), ) } @@ -273,7 +280,7 @@ up ready and start routing user requests to it. GetMetaDataApiCmd, memberIdx, (), - _ => app.metaData, + _ => appMethods.metaData, ) } @@ -314,7 +321,7 @@ up ready and start routing user requests to it. GetMetricsApiCmd, memberIdx, (), - _ => Future.successful(metricsReportFromV1Metrics(app.metrics)), + _ => Future.successful(metricsReportFromV1Metrics(appMethods.metrics)), ) } @@ -335,7 +342,7 @@ up ready and start routing user requests to it. memberIdx, resizes, r => - app + appMethods .shardSizes(r.view.mapValues(v => ShardInMemoryLimit(v.softLimit, v.hardLimit)).toMap) .map(_.view.mapValues(v => TShardInMemoryLimit(v.softLimit, v.hardLimit)).toMap)(ExecutionContext.parasitic), ) @@ -354,7 +361,7 @@ up ready and start routing user requests to it. SleepNodeApiCmd, memberIdx, (nodeId, namespaceFromParam(namespace)), - t => app.requestNodeSleep(t._1, t._2), + t => appMethods.requestNodeSleep(t._1, t._2), ) } diff --git a/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2AlgorithmEndpoints.scala b/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2AlgorithmEndpoints.scala index 47351972..5182383c 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2AlgorithmEndpoints.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2AlgorithmEndpoints.scala @@ -106,7 +106,7 @@ object V2AlgorithmEndpointEntities extends TapirJsonCirce { } -trait V2AlgorithmEndpoints extends V2EndpointDefinitions { +trait V2AlgorithmEndpoints extends V2QuineEndpointDefinitions { import V2AlgorithmEndpointEntities._ @@ -210,7 +210,9 @@ concatenated to produce the final file name: saveLocation, ), t => - Future.successful(app.algorithmSaveRandomWalks(t._1, t._2, t._3, t._4, t._5, t._6, t._7, t._8, t._9, t._10)), + Future.successful( + appMethods.algorithmSaveRandomWalks(t._1, t._2, t._3, t._4, t._5, t._6, t._7, t._8, t._9, t._10), + ), ) } @@ -245,7 +247,7 @@ concatenated to produce the final file name: GenerateRandomWalkApiCmd, memberIdx, (id, walkLengthOpt, queryOpt, returnOpt, inOutOpt, randomSeedOpt, namespaceFromParam(namespace), atTimeOpt), - t => app.algorithmRandomWalk(t._1, t._2, t._3, t._4, t._5, t._6, t._7, t._8), + t => appMethods.algorithmRandomWalk(t._1, t._2, t._3, t._4, t._5, t._6, t._7, t._8), ) } diff --git a/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2CypherEndpoints.scala b/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2CypherEndpoints.scala index 71147b62..caaafa13 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2CypherEndpoints.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2CypherEndpoints.scala @@ -45,7 +45,7 @@ object V2CypherEndpointEntities extends TapirJsonCirce { .schemaForMap[String, Json](identity) } -trait V2CypherEndpoints extends V2EndpointDefinitions { +trait V2CypherEndpoints extends V2QuineEndpointDefinitions { implicit val cypherQueryResultDecoder: Decoder[CypherQueryResult] = deriveDecoder[CypherQueryResult] implicit val cypherQueryResultEncoder: Encoder[CypherQueryResult] = deriveEncoder[CypherQueryResult] @@ -84,7 +84,7 @@ trait V2CypherEndpoints extends V2EndpointDefinitions { CypherPostApiCmd, memberIdx, (atTime, timeout, namespaceFromParam(namespace), CypherQuery(query.text, query.parameters)), - t => app.cypherPost(t._1, toConcreteDuration(t._2), t._3, t._4), + t => appMethods.cypherPost(t._1, toConcreteDuration(t._2), t._3, t._4), ) } @@ -103,7 +103,7 @@ trait V2CypherEndpoints extends V2EndpointDefinitions { CypherNodesPostApiCmd, memberIdx, (atTime, timeout, namespaceFromParam(namespace), CypherQuery(query.text, query.parameters)), - t => app.cypherNodesPost(t._1, toConcreteDuration(t._2), t._3, t._4), + t => appMethods.cypherNodesPost(t._1, toConcreteDuration(t._2), t._3, t._4), ) } @@ -122,7 +122,7 @@ trait V2CypherEndpoints extends V2EndpointDefinitions { CypherEdgesPostApiCmd, memberIdx, (atTime, timeout, namespaceFromParam(namespace), CypherQuery(query.text, query.parameters)), - t => app.cypherEdgesPost(t._1, toConcreteDuration(t._2), t._3, t._4), + t => appMethods.cypherEdgesPost(t._1, toConcreteDuration(t._2), t._3, t._4), ) } diff --git a/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2DebugEndpoints.scala b/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2DebugEndpoints.scala index c2acdaab..ae769ad4 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2DebugEndpoints.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2DebugEndpoints.scala @@ -71,7 +71,7 @@ the two nodes at the edge's endpoints contain half edges that: } -trait V2DebugEndpoints extends V2EndpointDefinitions { +trait V2DebugEndpoints extends V2QuineEndpointDefinitions { val idPathElement: EndpointInput.PathCapture[QuineId] = path[QuineId]("id").description("Node id") val propKeyParameter: EndpointInput.Query[String] = query[String]("key").description( @@ -124,7 +124,7 @@ closely as possible to how they would be emitted by DebugOpsPropertygetApiCmd, memberIdx, (id, propKey, atime, namespaceFromParam(ns)), - t => app.debugOpsPropertyGet(t._1, t._2, t._3, t._4), + t => appMethods.debugOpsPropertyGet(t._1, t._2, t._3, t._4), ) } @@ -144,7 +144,7 @@ closely as possible to how they would be emitted by DebugOpsGetApiCmd, memberIdx, (id, atime, namespaceFromParam(ns)), - t => app.debugOpsGet(t._1, t._2, t._3), + t => appMethods.debugOpsGet(t._1, t._2, t._3), ) } @@ -162,7 +162,7 @@ closely as possible to how they would be emitted by DebugVerboseApiCmd, memberIdx, (id, atime, namespaceFromParam(ns)), - t => app.debugOpsVerbose(t._1, t._2, t._3), + t => appMethods.debugOpsVerbose(t._1, t._2, t._3), ) } @@ -188,7 +188,7 @@ closely as possible to how they would be emitted by DebugEdgesGetApiCmd, memberIdx, (id, atime, limit, edgeDirOpt, otherOpt, edgeTypeOpt, namespaceFromParam(ns)), - t => app.debugOpsEdgesGet(t._1, t._2, t._3, t._4, t._5, t._6, t._7), + t => appMethods.debugOpsEdgesGet(t._1, t._2, t._3, t._4, t._5, t._6, t._7), ) } @@ -214,7 +214,7 @@ closely as possible to how they would be emitted by DebugHalfEdgesGetApiCmd, memberIdx, (id, atime, limit, edgeDirOpt, otherOpt, edgeTypeOpt, namespaceFromParam(ns)), - t => app.debugOpsEdgesGet(t._1, t._2, t._3, t._4, t._5, t._6, t._7), + t => appMethods.debugOpsEdgesGet(t._1, t._2, t._3, t._4, t._5, t._6, t._7), ) } diff --git a/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2IngestEndpoints.scala b/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2IngestEndpoints.scala index 11f60ac2..d3dac883 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2IngestEndpoints.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2IngestEndpoints.scala @@ -11,10 +11,9 @@ import sttp.tapir.{Endpoint, EndpointInput, Schema, path} import com.thatdot.quine.app.v2api.definitions._ import com.thatdot.quine.app.v2api.endpoints.V2IngestEntities.{IngestConfiguration => V2IngestConfiguration} import com.thatdot.quine.graph.NamespaceId -import com.thatdot.quine.routes.{IngestStreamInfo, IngestStreamInfoWithName, IngestStreamStatus} -trait V2IngestEndpoints extends V2EndpointDefinitions with V2IngestEntitySchemas { +import com.thatdot.quine.routes.{IngestStreamInfo, IngestStreamInfoWithName} +trait V2IngestEndpoints extends V2QuineEndpointDefinitions with V2IngestSchemas { - implicit val ingestStreamStatusEncoder: Encoder[IngestStreamStatus] = Encoder.encodeString.contramap(_.toString) private val ingestStreamNameElement: EndpointInput.PathCapture[String] = path[String]("name").description("Ingest stream name") @@ -44,7 +43,7 @@ trait V2IngestEndpoints extends V2EndpointDefinitions with V2IngestEntitySchemas CreateIngestApiCmd, memberIdx, (ingestStreamName, ingestStreamConfig, namespaceFromParam(ns)), - t => Future.successful(app.createIngestStream(t._1, t._2, t._3)), + t => Future.successful(appMethods.createIngestStream(t._1, t._2, t._3)), ) } @@ -60,7 +59,7 @@ trait V2IngestEndpoints extends V2EndpointDefinitions with V2IngestEntitySchemas PauseIngestApiCmd, memberIdx, (ingestStreamName, namespaceFromParam(ns)), - t => app.pauseIngestStream(t._1, t._2), + t => appMethods.pauseIngestStream(t._1, t._2), ) } @@ -76,7 +75,7 @@ trait V2IngestEndpoints extends V2EndpointDefinitions with V2IngestEntitySchemas UnpauseIngestApiCmd, memberIdx, (ingestStreamName, namespaceFromParam(ns)), - t => app.unpauseIngestStream(t._1, t._2), + t => appMethods.unpauseIngestStream(t._1, t._2), ) } @@ -94,7 +93,7 @@ trait V2IngestEndpoints extends V2EndpointDefinitions with V2IngestEntitySchemas CreateIngestApiCmd, memberIdx, (ingestStreamName, namespaceFromParam(ns)), - t => app.deleteIngestStream(t._1, t._2), + t => appMethods.deleteIngestStream(t._1, t._2), ) } @@ -109,7 +108,7 @@ trait V2IngestEndpoints extends V2EndpointDefinitions with V2IngestEntitySchemas IngestStatusApiCmd, memberIdx, (ingestStreamName, namespaceFromParam(ns)), - t => app.ingestStreamStatus(t._1, t._2), + t => appMethods.ingestStreamStatus(t._1, t._2), ) } @@ -126,7 +125,7 @@ trait V2IngestEndpoints extends V2EndpointDefinitions with V2IngestEntitySchemas IngestStatusApiCmd, memberIdx, namespaceFromParam(ns), - ns => app.listIngestStreams(ns), + ns => appMethods.listIngestStreams(ns), ) } diff --git a/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2IngestEntitySchemas.scala b/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2IngestSchemas.scala similarity index 97% rename from quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2IngestEntitySchemas.scala rename to quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2IngestSchemas.scala index fa43f20d..1cddc133 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2IngestEntitySchemas.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2IngestSchemas.scala @@ -21,7 +21,7 @@ import sttp.tapir.{Codec, DecodeResult, Schema} import com.thatdot.quine.app.v2api.endpoints.V2IngestEntities.{IngestFormat, _} import com.thatdot.quine.routes.CsvCharacter.{Backslash, Comma, DoubleQuote} import com.thatdot.quine.routes.{KinesisIngest => V1KinesisIngest, _} -trait V2IngestEntitySchemas extends TapirJsonCirce { +trait V2IngestSchemas extends TapirJsonCirce { implicit val csvCharacterSchema: Schema[CsvCharacter] = Schema.derived[CsvCharacter] implicit val recordDecodingTypeSchema: Schema[RecordDecodingType] = Schema.derived[RecordDecodingType] @@ -118,5 +118,5 @@ trait V2IngestEntitySchemas extends TapirJsonCirce { implicit val config = ingestSourceTypeConfig deriveConfiguredDecoder[OnStreamErrorHandler] } - + implicit val ingestStreamStatusEncoder: Encoder[IngestStreamStatus] = Encoder.encodeString.contramap(_.toString) } diff --git a/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2NamespaceEndpoints.scala b/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2NamespaceEndpoints.scala index 976e7981..a75608ea 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2NamespaceEndpoints.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2NamespaceEndpoints.scala @@ -10,7 +10,7 @@ import sttp.tapir.{Schema, path} import com.thatdot.quine.app.v2api.definitions._ /** Placeholder route to demonstrate V2. Not intended to represent a final endpoint. */ -trait V2NamespaceEndpoints extends V2EndpointDefinitions { +trait V2NamespaceEndpoints extends V2QuineEndpointDefinitions { private def namespaceEndpoint[T](implicit schema: Schema[ObjectEnvelope[T]], @@ -23,7 +23,7 @@ trait V2NamespaceEndpoints extends V2EndpointDefinitions { ], ObjectEnvelope[List[String]], Any, Future] = namespaceEndpoint[List[String]].get .serverLogic(memberIdx => - runServerLogic[Unit, List[String]](GetNamespaces, memberIdx, (), _ => app.getNamespaces), + runServerLogic[Unit, List[String]](GetNamespaces, memberIdx, (), _ => appMethods.getNamespaces), ) val createNamespaceEndpoint: ServerEndpoint.Full[Unit, Unit, (Option[Int], String), ErrorEnvelope[ @@ -33,7 +33,7 @@ trait V2NamespaceEndpoints extends V2EndpointDefinitions { .in(path[String]("namespace")) .put .serverLogic { case (memberIdx, namespace) => - runServerLogic[String, Boolean](CreateNamespace, memberIdx, namespace, app.createNamespace) + runServerLogic[String, Boolean](CreateNamespace, memberIdx, namespace, appMethods.createNamespace) } val deleteNamespaceEndpoint: ServerEndpoint.Full[Unit, Unit, (Option[Int], String), ErrorEnvelope[ @@ -43,6 +43,6 @@ trait V2NamespaceEndpoints extends V2EndpointDefinitions { .in(path[String]("namespace")) .delete .serverLogic { case (memberIdx, namespace) => - runServerLogic(DeleteNamespace, memberIdx, namespace, app.deleteNamespace) + runServerLogic(DeleteNamespace, memberIdx, namespace, appMethods.deleteNamespace) } } diff --git a/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2StandingEndpoints.scala b/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2StandingEndpoints.scala index a442b65a..b2f4a575 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2StandingEndpoints.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/v2api/endpoints/V2StandingEndpoints.scala @@ -15,7 +15,7 @@ import com.thatdot.quine.graph.NamespaceId import com.thatdot.quine.routes.StandingQueryPattern.StandingQueryMode import com.thatdot.quine.routes.{RegisteredStandingQuery, StandingQueryDefinition, StandingQueryResultOutputUserDef} -trait V2StandingEndpoints extends V2EndpointDefinitions { +trait V2StandingEndpoints extends V2QuineEndpointDefinitions { private val sqModesMap: Map[String, StandingQueryMode] = StandingQueryMode.values.map(s => (s.toString -> s)).toMap implicit val sqModeEncoder: Encoder[StandingQueryMode] = Encoder.encodeString.contramap(_.toString) @@ -58,7 +58,7 @@ trait V2StandingEndpoints extends V2EndpointDefinitions { ListSQsApiCmd, memberIdx, namespaceFromParam(namespace), - ns => app.listStandingQueries(ns), + ns => appMethods.listStandingQueries(ns), ) } @@ -90,7 +90,7 @@ trait V2StandingEndpoints extends V2EndpointDefinitions { PropagateSQsApiCmd, memberIdx, (includeSleeping, namespaceFromParam(namespace), wakeUpParallelism.getOrElse(4)), - t => app.propagateStandingQuery(t._1, t._2, t._3), + t => appMethods.propagateStandingQuery(t._1, t._2, t._3), ) } @@ -114,7 +114,7 @@ trait V2StandingEndpoints extends V2EndpointDefinitions { CreateSQOutputApiCmd, memberIdx, (sqName, sqOutputName, namespaceFromParam(namespace), outputDef), - t => app.addSQOutput(t._1, t._2, t._3, t._4), + t => appMethods.addSQOutput(t._1, t._2, t._3, t._4), ) } @@ -143,7 +143,7 @@ trait V2StandingEndpoints extends V2EndpointDefinitions { CreateSQApiCmd, memberIdx, (sqName, namespaceFromParam(namespace), definition), - t => app.createSQ(t._1, t._2, t._3), + t => appMethods.createSQ(t._1, t._2, t._3), ) } @@ -161,7 +161,7 @@ trait V2StandingEndpoints extends V2EndpointDefinitions { DeleteSQOutputApiCmd, memberIdx, (standingQueryName, namespaceFromParam(namespace)), - t => app.deleteSQ(t._1, t._2), + t => appMethods.deleteSQ(t._1, t._2), ) } @@ -181,7 +181,7 @@ trait V2StandingEndpoints extends V2EndpointDefinitions { DeleteSQOutputApiCmd, memberIdx, (sqName, sqOutputName, namespaceFromParam(namespace)), - t => app.deleteSQOutput(t._1, t._2, t._3), + t => appMethods.deleteSQOutput(t._1, t._2, t._3), ) } @@ -199,7 +199,7 @@ trait V2StandingEndpoints extends V2EndpointDefinitions { GetSQApiCmd, memberIdx, (sqName, namespaceFromParam(namespace)), - t => app.getSQ(t._1, t._2), + t => appMethods.getSQ(t._1, t._2), ) } diff --git a/quine/src/test/scala/com/thatdot/quine/ingest2/IngestCodecTest.scala b/quine/src/test/scala/com/thatdot/quine/ingest2/IngestCodecTest.scala index 49ccce52..0545d07d 100644 --- a/quine/src/test/scala/com/thatdot/quine/ingest2/IngestCodecTest.scala +++ b/quine/src/test/scala/com/thatdot/quine/ingest2/IngestCodecTest.scala @@ -15,7 +15,7 @@ import com.thatdot.quine.app.v2api.endpoints.V2IngestEntities.{ IngestConfiguration, ProtobufIngestFormat, } -import com.thatdot.quine.app.v2api.endpoints.{V2IngestEntities, V2IngestEntitySchemas} +import com.thatdot.quine.app.v2api.endpoints.{V2IngestEntities, V2IngestSchemas} import com.thatdot.quine.routes.FileIngestMode.Regular import com.thatdot.quine.routes.KafkaAutoOffsetReset.Latest import com.thatdot.quine.routes.KafkaOffsetCommitting.ExplicitCommit @@ -28,7 +28,7 @@ class IngestCodecTest extends AnyFunSuite with ScalaCheckDrivenPropertyChecks with ArbitraryIngests - with V2IngestEntitySchemas { + with V2IngestSchemas { def testJsonEncodeDecode[V: Encoder: Decoder](v: V): Assertion = { val json = v.asJson