Skip to content

Commit

Permalink
Add delta nnode header to responses (#4624)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Jan 8, 2024
1 parent 87e9422 commit 5521032
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 17 deletions.
32 changes: 18 additions & 14 deletions delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.adapter._
import akka.actor.{ActorSystem => ActorSystemClassic}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.server.{ExceptionHandler, RejectionHandler, Route, RouteResult}
import cats.effect.{ExitCode, IO, IOApp, Resource}
import cats.syntax.all._
Expand Down Expand Up @@ -117,21 +118,24 @@ object Main extends IOApp {
)
)

private def routes(locator: Locator): Route = {
private def routes(locator: Locator, clusterConfig: ClusterConfig): Route = {
import akka.http.scaladsl.server.Directives._
import ch.epfl.bluebrain.nexus.delta.sdk.directives.UriDirectives._
cors(locator.get[CorsSettings]) {
handleExceptions(locator.get[ExceptionHandler]) {
handleRejections(locator.get[RejectionHandler]) {
uriPrefix(locator.get[BaseUri].base) {
encodeResponse {
val (strict, rest) = locator.get[Set[PriorityRoute]].partition(_.requiresStrictEntity)
concat(
concat(rest.toVector.sortBy(_.priority).map(_.route): _*),
locator.get[StrictEntity].apply() {
concat(strict.toVector.sortBy(_.priority).map(_.route): _*)
}
)
val nodeHeader = RawHeader("X-Delta-Node", clusterConfig.nodeIndex.toString)
respondWithHeader(nodeHeader) {
cors(locator.get[CorsSettings]) {
handleExceptions(locator.get[ExceptionHandler]) {
handleRejections(locator.get[RejectionHandler]) {
uriPrefix(locator.get[BaseUri].base) {
encodeResponse {
val (strict, rest) = locator.get[Set[PriorityRoute]].partition(_.requiresStrictEntity)
concat(
concat(rest.toVector.sortBy(_.priority).map(_.route): _*),
locator.get[StrictEntity].apply() {
concat(strict.toVector.sortBy(_.priority).map(_.route): _*)
}
)
}
}
}
}
Expand All @@ -150,7 +154,7 @@ object Main extends IOApp {
cfg.http.interface,
cfg.http.port
)
.bindFlow(RouteResult.routeToFlow(routes(locator)))
.bindFlow(RouteResult.routeToFlow(routes(locator, cfg.projections.cluster)))
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.config
import ch.epfl.bluebrain.nexus.delta.kernel.RetryStrategyConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.config.ProjectionConfig.ClusterConfig
import pureconfig.ConfigReader
import pureconfig.error.FailureReason
import pureconfig.generic.semiauto.deriveReader

import scala.concurrent.duration.FiniteDuration
Expand Down Expand Up @@ -56,7 +57,16 @@ object ProjectionConfig {

object ClusterConfig {
implicit final val clusterConfigReader: ConfigReader[ClusterConfig] =
deriveReader[ClusterConfig]
deriveReader[ClusterConfig].emap { config =>
Either.cond(
config.nodeIndex < config.size,
config,
new FailureReason {
override def description: String = s"'node-index' must be smaller than 'size'"
}
)

}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package ch.epfl.bluebrain.nexus.delta.sourcing.config

import ch.epfl.bluebrain.nexus.delta.sourcing.config.ProjectionConfig.ClusterConfig
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
import pureconfig.ConfigSource

class ProjectionConfigSuite extends NexusSuite {

private def parseConfig(nodeIndex: Int, size: Int) =
ConfigSource
.string(s"""
|cluster {
| node-index = $nodeIndex
| size = $size
|}
|""".stripMargin)
.at("cluster")
.load[ClusterConfig]

test("Parse successfully when the node index is lower than the cluster size") {
parseConfig(1, 2).assertRight(ClusterConfig(2, 1))
}

test("Fail to parse when the node index is higher than the cluster size") {
parseConfig(2, 1).assertLeft()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ class HttpClient private (baseUrl: Uri, httpExt: HttpExt)(implicit

private def fromFuture[A](future: => Future[A]) = IO.fromFuture { IO.delay(future) }

private def assertDeltaNodeHeader(response: HttpResponse) =
response.headers.map(_.name()) should contain("X-Delta-Node") withClue "A default header is missing."

def apply(req: HttpRequest): IO[HttpResponse] =
fromFuture(httpExt.singleRequest(req))

Expand Down Expand Up @@ -133,7 +136,10 @@ class HttpClient private (baseUrl: Uri, httpExt: HttpExt)(implicit
val entity = HttpEntity(contentType, s.getBytes)
FormData(BodyPart.Strict("file", entity, Map("filename" -> fileName))).toEntity()
},
(a: A, response: HttpResponse) => assertResponse(a, response) withClue buildClue(a, response),
(a: A, response: HttpResponse) => {
assertDeltaNodeHeader(response)
assertResponse(a, response) withClue buildClue(a, response)
},
extraHeaders
)
}
Expand Down Expand Up @@ -185,7 +191,10 @@ class HttpClient private (baseUrl: Uri, httpExt: HttpExt)(implicit
url,
body,
identity,
(a: A, response: HttpResponse) => assertResponse(a, response) withClue buildClue(a, response),
(a: A, response: HttpResponse) => {
assertDeltaNodeHeader(response)
assertResponse(a, response) withClue buildClue(a, response)
},
extraHeaders
)
}
Expand Down

0 comments on commit 5521032

Please sign in to comment.