Skip to content

Commit

Permalink
Migrate integration tests to use Cats Effect
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon Dumas committed Oct 6, 2023
1 parent c1d5d65 commit 20ee483
Show file tree
Hide file tree
Showing 34 changed files with 395 additions and 461 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,17 @@ import org.scalactic.source
import org.scalatest.Assertion
import org.scalatest.Assertions._

import scala.concurrent.{ExecutionContext, Future}
import scala.reflect.ClassTag

trait CatsIOValues {
trait CatsIOValues extends CatsIOValuesLowPrio {

implicit def ioToFutureAssertion(io: IO[Assertion]): Future[Assertion] = io.unsafeToFuture()

implicit def futureListToFutureAssertion(future: Future[List[Assertion]])(implicit
ec: ExecutionContext
): Future[Assertion] =
future.map(_ => succeed)

implicit final class CatsIOValuesOps[A](private val io: IO[A]) {
def accepted: A = io.unsafeRunSync()
Expand All @@ -31,3 +39,8 @@ trait CatsIOValues {
}

}

trait CatsIOValuesLowPrio {
implicit def ioListToFutureAssertion(io: IO[List[Assertion]])(implicit ec: ExecutionContext): Future[Assertion] =
io.unsafeToFuture().map(_ => succeed)
}
97 changes: 42 additions & 55 deletions tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/BaseSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._
import akka.http.scaladsl.testkit.ScalatestRouteTest
import akka.util.ByteString
import cats.implicits._
import cats.effect.concurrent.Ref
import cats.effect.{ContextShift, IO}
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.testkit._
import ch.epfl.bluebrain.nexus.testkit.ce.CatsIOValues
import ch.epfl.bluebrain.nexus.tests.BaseSpec._
import ch.epfl.bluebrain.nexus.tests.HttpClient._
import ch.epfl.bluebrain.nexus.tests.Identity.{allUsers, testClient, testRealm, _}
Expand All @@ -19,16 +23,14 @@ import ch.epfl.bluebrain.nexus.tests.iam.types.Permission.Organizations
import ch.epfl.bluebrain.nexus.tests.iam.{AclDsl, PermissionDsl}
import ch.epfl.bluebrain.nexus.tests.kg.{ElasticSearchViewsDsl, KgDsl}
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.Logger
import io.circe.Json
import monix.bio.Task
import monix.execution.Scheduler.Implicits.global
import org.scalactic.source.Position
import org.scalatest.concurrent.{Eventually, ScalaFutures}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpecLike
import org.scalatest.{Assertion, BeforeAndAfterAll, OptionValues}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

trait BaseSpec
Expand All @@ -42,12 +44,14 @@ trait BaseSpec
with TestHelpers
with ScalatestRouteTest
with Eventually
with IOValues
with CatsIOValues
with OptionValues
with ScalaFutures
with Matchers {

private val logger = Logger[this.type]
implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

private val logger = Logger.cats[this.type]

implicit val config: TestsConfig = load[TestsConfig](ConfigFactory.load(), "tests")

Expand All @@ -67,15 +71,10 @@ trait BaseSpec

implicit override def patienceConfig: PatienceConfig = PatienceConfig(config.patience, 300.millis)

def eventually(t: Task[Assertion])(implicit pos: Position): Assertion =
eventually {
t.runSyncUnsafe()
}
def eventually(io: IO[Assertion])(implicit pos: Position): Assertion =
eventually { io.unsafeRunSync() }

def runTask[A](t: Task[A]): Assertion =
t.map { _ =>
succeed
}.runSyncUnsafe()
def runIO[A](io: IO[A]): Assertion = io.map { _ => succeed }.unsafeRunSync()

override def beforeAll(): Unit = {
super.beforeAll()
Expand All @@ -102,32 +101,28 @@ trait BaseSpec

val allTasks = for {
isSetupCompleted <- setupCompleted.get
_ <- Task.unless(isSetupCompleted)(setup)
_ <- IO.unlessA(isSetupCompleted)(setup)
_ <- setupCompleted.set(true)
_ <- aclDsl.cleanAclsAnonymous
} yield ()

allTasks.runSyncUnsafe()

allTasks.unsafeRunSync()
}

override def afterAll(): Unit =
Task.when(config.cleanUp)(elasticsearchDsl.deleteAllIndices().void).runSyncUnsafe()
IO.whenA(config.cleanUp)(elasticsearchDsl.deleteAllIndices().void).unsafeRunSync()

protected def toAuthorizationHeader(token: String) =
Authorization(
HttpCredentials.createOAuth2BearerToken(token)
)

private[tests] def authenticateUser(user: UserCredentials, client: ClientCredentials): Task[Unit] = {
keycloakDsl.userToken(user, client).map { token =>
logger.info(s"Token for user ${user.name} is: $token")
tokensMap.put(user, toAuthorizationHeader(token))
()
}
}
Authorization(HttpCredentials.createOAuth2BearerToken(token))

private[tests] def authenticateClient(client: ClientCredentials): Task[Unit] = {
private[tests] def authenticateUser(user: UserCredentials, client: ClientCredentials): IO[Unit] =
for {
token <- keycloakDsl.userToken(user, client)
_ <- logger.info(s"Token for user ${user.name} is: $token")
_ <- IO(tokensMap.put(user, toAuthorizationHeader(token)))
} yield ()

private[tests] def authenticateClient(client: ClientCredentials): IO[Unit] = {
keycloakDsl.serviceAccountToken(client).map { token =>
tokensMap.put(client, toAuthorizationHeader(token))
()
Expand All @@ -152,35 +147,29 @@ trait BaseSpec
identity: Identity,
client: ClientCredentials,
users: List[UserCredentials]
): Task[Unit] = {
def createRealmInDelta: Task[Assertion] =
): IO[Unit] = {
def createRealmInDelta: IO[Assertion] =
deltaClient.get[Json](s"/realms/${realm.name}", identity) { (json, response) =>
runTask {
runIO {
response.status match {
case StatusCodes.NotFound =>
logger.info(s"Realm ${realm.name} is absent, we create it")
val body =
jsonContentOf(
"/iam/realms/create.json",
"realm" -> s"${config.realmSuffix(realm)}"
)
for {
_ <- deltaClient.put[Json](s"/realms/${realm.name}", body, identity) { (_, response) =>
response.status shouldEqual StatusCodes.Created
}
_ <- deltaClient.get[Json](s"/realms/${realm.name}", Identity.ServiceAccount) { (_, response) =>
response.status shouldEqual StatusCodes.OK
}
_ <- logger.info(s"Realm ${realm.name} is absent, we create it")
_ <- deltaClient.put[Json](s"/realms/${realm.name}", body, identity) { expectCreated }
_ <- deltaClient.get[Json](s"/realms/${realm.name}", Identity.ServiceAccount) { expectOk }
} yield ()
case StatusCodes.Forbidden | StatusCodes.OK =>
logger.info(s"Realm ${realm.name} has already been created, we got status ${response.status}")
deltaClient.get[Json](s"/realms/${realm.name}", Identity.ServiceAccount) { (_, response) =>
response.status shouldEqual StatusCodes.OK
}
for {
_ <- logger.info(s"Realm ${realm.name} has already been created, we got status ${response.status}")
_ <- deltaClient.get[Json](s"/realms/${realm.name}", Identity.ServiceAccount) { expectOk }
} yield ()
case s =>
Task(
fail(s"$s wasn't expected here and we got this response: $json")
)
IO(fail(s"$s wasn't expected here and we got this response: $json"))
}
}
}
Expand All @@ -189,24 +178,22 @@ trait BaseSpec
// Create the realm in Keycloak
_ <- keycloakDsl.importRealm(realm, client, users)
// Get the tokens and cache them in the map
_ <- users.parTraverse { user =>
authenticateUser(user, client)
}
_ <- users.parTraverse { user => authenticateUser(user, client) }
_ <- authenticateClient(client)
// Creating the realm in delta
_ <- Task { logger.info(s"Creating realm ${realm.name} in the delta instance") }
_ <- logger.info(s"Creating realm ${realm.name} in the delta instance")
_ <- createRealmInDelta
} yield ()
}

/**
* Create projects and the parent organization for the provided user
*/
def createProjects(user: Authenticated, org: String, projects: String*): Task[Unit] =
def createProjects(user: Authenticated, org: String, projects: String*): IO[Unit] =
for {
_ <- aclDsl.addPermission("/", user, Organizations.Create)
_ <- adminDsl.createOrganization(org, org, user, ignoreConflict = true)
_ <- projects.traverse { project =>
_ <- projects.toList.traverse { project =>
val projectRef = s"$org/$project"
adminDsl.createProject(org, project, kgDsl.projectJson(name = projectRef), user)
}
Expand All @@ -230,7 +217,7 @@ trait BaseSpec
response.header[`Content-Encoding`].value.encodings

private[tests] def decodeGzip(input: ByteString): String =
Coders.Gzip.decode(input).map(_.utf8String)(global).futureValue
Coders.Gzip.decode(input).map(_.utf8String).futureValue

private[tests] def genId(length: Int = 15): String =
genString(length = length, Vector.range('a', 'z') ++ Vector.range('0', '9'))
Expand All @@ -251,6 +238,6 @@ trait BaseSpec

object BaseSpec {

val setupCompleted: IORef[Boolean] = IORef.unsafe(false)
val setupCompleted: Ref[IO, Boolean] = Ref.unsafe(false)

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@ import akka.http.scaladsl.model.HttpMethods.GET
import akka.http.scaladsl.model.headers.Accept
import akka.http.scaladsl.model.{HttpRequest, MediaRange, MediaType}
import akka.stream.Materializer
import cats.effect.{ContextShift, IO}
import ch.epfl.bluebrain.nexus.testkit.{CirceLiteral, TestHelpers}
import io.circe.optics.JsonPath.root
import monix.bio.Task
import monix.execution.Scheduler.Implicits.global
import org.scalatest.matchers.should.Matchers

class BlazegraphDsl(implicit as: ActorSystem, materializer: Materializer)
extends TestHelpers
import scala.concurrent.ExecutionContext

class BlazegraphDsl(implicit
as: ActorSystem,
materializer: Materializer,
contextShift: ContextShift[IO],
ec: ExecutionContext
) extends TestHelpers
with CirceLiteral
with CirceUnmarshalling
with Matchers {
Expand All @@ -39,17 +44,14 @@ class BlazegraphDsl(implicit as: ActorSystem, materializer: Materializer)
all should not contain allElementsOf(namespaces)
}

def allNamespaces: Task[List[String]] = {
def allNamespaces: IO[List[String]] = {
blazegraphClient(
HttpRequest(
method = GET,
uri = s"$blazegraphUrl/blazegraph/namespace?describe-each-named-graph=false"
).addHeader(Accept(sparqlJsonRange))
).flatMap { res =>
Task
.deferFuture {
jsonUnmarshaller(res.entity)(global, materializer)
}
IO.fromFuture(IO(jsonUnmarshaller(res.entity)))
.map { json =>
root.results.bindings.each.filter(filterNamespaces).`object`.value.string.getAll(json)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,41 @@ import akka.http.scaladsl.model.HttpMethods.{DELETE, GET, PUT}
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpRequest, StatusCode}
import akka.stream.Materializer
import cats.effect.{ContextShift, IO}
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.testkit.{CirceLiteral, TestHelpers}
import com.typesafe.scalalogging.Logger
import monix.bio.Task
import monix.execution.Scheduler.Implicits.global
import org.scalatest.matchers.should.Matchers

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext

class ElasticsearchDsl(implicit as: ActorSystem, materializer: Materializer)
extends TestHelpers
class ElasticsearchDsl(implicit
as: ActorSystem,
materializer: Materializer,
contextShift: ContextShift[IO],
ec: ExecutionContext
) extends TestHelpers
with CirceLiteral
with CirceUnmarshalling
with Matchers {

private val logger = Logger[this.type]
private val logger = Logger.cats[this.type]

private val elasticUrl = s"http://${sys.props.getOrElse("elasticsearch-url", "localhost:9200")}"
private val elasticClient = HttpClient(elasticUrl)
private val credentials = BasicHttpCredentials("elastic", "password")

def createTemplate(): Task[StatusCode] = {
logger.info("Creating template for Elasticsearch indices")

def createTemplate(): IO[StatusCode] = {
val json = jsonContentOf("/elasticsearch/template.json")

elasticClient(
HttpRequest(
method = PUT,
uri = s"$elasticUrl/_index_template/test_template",
entity = HttpEntity(ContentTypes.`application/json`, json.noSpaces)
).addCredentials(credentials)
).onErrorRestartLoop((10, 10.seconds)) { (err, state, retry) =>
val (maxRetries, delay) = state
if (maxRetries > 0)
retry((maxRetries - 1, delay)).delayExecution(delay)
else
Task.raiseError(err)
}.tapError { t =>
Task { logger.error(s"Error while importing elasticsearch template", t) }
}.map { res =>
logger.info(s"Importing the elasticsearch template returned ${res.status}")
res.status
}
logger.info("Creating template for Elasticsearch indices") >>
elasticClient(
HttpRequest(
method = PUT,
uri = s"$elasticUrl/_index_template/test_template",
entity = HttpEntity(ContentTypes.`application/json`, json.noSpaces)
).addCredentials(credentials)
).map(_.status)
}

def includes(indices: String*) =
Expand All @@ -60,32 +52,28 @@ class ElasticsearchDsl(implicit as: ActorSystem, materializer: Materializer)
all should not contain allElementsOf(indices)
}

def allIndices: Task[List[String]] = {
def allIndices: IO[List[String]] = {
elasticClient(
HttpRequest(
method = GET,
uri = s"$elasticUrl/_aliases"
).addCredentials(credentials)
).flatMap { res =>
Task
.deferFuture {
jsonUnmarshaller(res.entity)(global, materializer)
}
IO.fromFuture(IO(jsonUnmarshaller(res.entity)))
.map(_.asObject.fold(List.empty[String])(_.keys.toList))
}
}

def deleteAllIndices(): Task[StatusCode] =
def deleteAllIndices(): IO[StatusCode] =
elasticClient(
HttpRequest(
method = DELETE,
uri = s"$elasticUrl/delta_*"
).addCredentials(credentials)
).tapError { t =>
Task { logger.error(s"Error while deleting elasticsearch indices", t) }
}.map { res =>
logger.info(s"Deleting elasticsearch indices returned ${res.status}")
res.status
).onError { t =>
logger.error(t)(s"Error while deleting elasticsearch indices")
}.flatMap { res =>
logger.info(s"Deleting elasticsearch indices returned ${res.status}").as(res.status)
}

}
Loading

0 comments on commit 20ee483

Please sign in to comment.