From 5521032e1ab967a0802b64fd66c60f4c01369099 Mon Sep 17 00:00:00 2001 From: Simon Date: Mon, 8 Jan 2024 11:51:00 +0100 Subject: [PATCH] Add delta nnode header to responses (#4624) Co-authored-by: Simon Dumas --- .../ch/epfl/bluebrain/nexus/delta/Main.scala | 32 +++++++++++-------- .../sourcing/config/ProjectionConfig.scala | 12 ++++++- .../config/ProjectionConfigSuite.scala | 28 ++++++++++++++++ .../bluebrain/nexus/tests/HttpClient.scala | 13 ++++++-- 4 files changed, 68 insertions(+), 17 deletions(-) create mode 100644 delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/ProjectionConfigSuite.scala diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/Main.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/Main.scala index 07f499b575..8e2ad42606 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/Main.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/Main.scala @@ -4,6 +4,7 @@ import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.adapter._ import akka.actor.{ActorSystem => ActorSystemClassic} import akka.http.scaladsl.Http +import akka.http.scaladsl.model.headers.RawHeader import akka.http.scaladsl.server.{ExceptionHandler, RejectionHandler, Route, RouteResult} import cats.effect.{ExitCode, IO, IOApp, Resource} import cats.syntax.all._ @@ -117,21 +118,24 @@ object Main extends IOApp { ) ) - private def routes(locator: Locator): Route = { + private def routes(locator: Locator, clusterConfig: ClusterConfig): Route = { import akka.http.scaladsl.server.Directives._ import ch.epfl.bluebrain.nexus.delta.sdk.directives.UriDirectives._ - cors(locator.get[CorsSettings]) { - handleExceptions(locator.get[ExceptionHandler]) { - handleRejections(locator.get[RejectionHandler]) { - uriPrefix(locator.get[BaseUri].base) { - encodeResponse { - val (strict, rest) = locator.get[Set[PriorityRoute]].partition(_.requiresStrictEntity) - concat( - concat(rest.toVector.sortBy(_.priority).map(_.route): _*), - locator.get[StrictEntity].apply() { - concat(strict.toVector.sortBy(_.priority).map(_.route): _*) - } - ) + val nodeHeader = RawHeader("X-Delta-Node", clusterConfig.nodeIndex.toString) + respondWithHeader(nodeHeader) { + cors(locator.get[CorsSettings]) { + handleExceptions(locator.get[ExceptionHandler]) { + handleRejections(locator.get[RejectionHandler]) { + uriPrefix(locator.get[BaseUri].base) { + encodeResponse { + val (strict, rest) = locator.get[Set[PriorityRoute]].partition(_.requiresStrictEntity) + concat( + concat(rest.toVector.sortBy(_.priority).map(_.route): _*), + locator.get[StrictEntity].apply() { + concat(strict.toVector.sortBy(_.priority).map(_.route): _*) + } + ) + } } } } @@ -150,7 +154,7 @@ object Main extends IOApp { cfg.http.interface, cfg.http.port ) - .bindFlow(RouteResult.routeToFlow(routes(locator))) + .bindFlow(RouteResult.routeToFlow(routes(locator, cfg.projections.cluster))) ) ) diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/ProjectionConfig.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/ProjectionConfig.scala index 3c3f5a2792..33312c1f63 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/ProjectionConfig.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/ProjectionConfig.scala @@ -3,6 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.config import ch.epfl.bluebrain.nexus.delta.kernel.RetryStrategyConfig import ch.epfl.bluebrain.nexus.delta.sourcing.config.ProjectionConfig.ClusterConfig import pureconfig.ConfigReader +import pureconfig.error.FailureReason import pureconfig.generic.semiauto.deriveReader import scala.concurrent.duration.FiniteDuration @@ -56,7 +57,16 @@ object ProjectionConfig { object ClusterConfig { implicit final val clusterConfigReader: ConfigReader[ClusterConfig] = - deriveReader[ClusterConfig] + deriveReader[ClusterConfig].emap { config => + Either.cond( + config.nodeIndex < config.size, + config, + new FailureReason { + override def description: String = s"'node-index' must be smaller than 'size'" + } + ) + + } } } diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/ProjectionConfigSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/ProjectionConfigSuite.scala new file mode 100644 index 0000000000..a90803208f --- /dev/null +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/config/ProjectionConfigSuite.scala @@ -0,0 +1,28 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.config + +import ch.epfl.bluebrain.nexus.delta.sourcing.config.ProjectionConfig.ClusterConfig +import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite +import pureconfig.ConfigSource + +class ProjectionConfigSuite extends NexusSuite { + + private def parseConfig(nodeIndex: Int, size: Int) = + ConfigSource + .string(s""" + |cluster { + | node-index = $nodeIndex + | size = $size + |} + |""".stripMargin) + .at("cluster") + .load[ClusterConfig] + + test("Parse successfully when the node index is lower than the cluster size") { + parseConfig(1, 2).assertRight(ClusterConfig(2, 1)) + } + + test("Fail to parse when the node index is higher than the cluster size") { + parseConfig(2, 1).assertLeft() + } + +} diff --git a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/HttpClient.scala b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/HttpClient.scala index fa60adb097..7d2ff5a360 100644 --- a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/HttpClient.scala +++ b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/HttpClient.scala @@ -37,6 +37,9 @@ class HttpClient private (baseUrl: Uri, httpExt: HttpExt)(implicit private def fromFuture[A](future: => Future[A]) = IO.fromFuture { IO.delay(future) } + private def assertDeltaNodeHeader(response: HttpResponse) = + response.headers.map(_.name()) should contain("X-Delta-Node") withClue "A default header is missing." + def apply(req: HttpRequest): IO[HttpResponse] = fromFuture(httpExt.singleRequest(req)) @@ -133,7 +136,10 @@ class HttpClient private (baseUrl: Uri, httpExt: HttpExt)(implicit val entity = HttpEntity(contentType, s.getBytes) FormData(BodyPart.Strict("file", entity, Map("filename" -> fileName))).toEntity() }, - (a: A, response: HttpResponse) => assertResponse(a, response) withClue buildClue(a, response), + (a: A, response: HttpResponse) => { + assertDeltaNodeHeader(response) + assertResponse(a, response) withClue buildClue(a, response) + }, extraHeaders ) } @@ -185,7 +191,10 @@ class HttpClient private (baseUrl: Uri, httpExt: HttpExt)(implicit url, body, identity, - (a: A, response: HttpResponse) => assertResponse(a, response) withClue buildClue(a, response), + (a: A, response: HttpResponse) => { + assertDeltaNodeHeader(response) + assertResponse(a, response) withClue buildClue(a, response) + }, extraHeaders ) }