Skip to content

Commit

Permalink
Migrate ScopedEventLog to Cats Effect (#4460)
Browse files Browse the repository at this point in the history
* Migrate ScopedEventLog to Cats Effect

* Migrate ScopedEventStore to Cats Effect

* Migrate EntityDependencyStore to Cats Effect

* Migrate PartitionInit to Cats Effect

* Change ThrowableValue to shut Scapegoat up
  • Loading branch information
shinyhappydan authored Nov 6, 2023
1 parent 638047f commit c901cb8
Show file tree
Hide file tree
Showing 31 changed files with 263 additions and 274 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package ch.epfl.bluebrain.nexus.delta.kernel.error

abstract class ThrowableValue extends Throwable { self =>
override def fillInStackTrace(): Throwable = self
}
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ final class BlazegraphViews(
project: ProjectRef
): IO[BlazegraphViewState] = {
for {
pc <- fetchContext.onRead(project)
iri <- expandIri(id.value, pc)
pc <- fetchContext.onRead(project).toCatsIO
iri <- expandIri(id.value, pc).toCatsIO
notFound = ViewNotFound(iri, project)
state <- id match {
case Latest(_) => log.stateOr(project, iri, notFound)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewRejection.{InvalidViewReferences, PermissionIsNotDefined, TooManyViewReferences}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewValue
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewValue.{AggregateBlazegraphViewValue, IndexingBlazegraphViewValue}
Expand All @@ -27,7 +26,7 @@ object ValidateBlazegraphView {
maxViewRefs,
TooManyViewReferences,
xas
)(v.views).toCatsIO
)(v.views)
case v: IndexingBlazegraphViewValue =>
for {
_ <- fetchPermissions.flatMap { perms =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ final class CompositeViews private (
project: ProjectRef
): IO[CompositeViewState] = {
for {
pc <- fetchContext.onRead(project)
iri <- expandIri(id.value, pc)
pc <- fetchContext.onRead(project).toCatsIO
iri <- expandIri(id.value, pc).toCatsIO
notFound = ViewNotFound(iri, project)
state <- id match {
case Latest(_) => log.stateOr(project, iri, notFound)
Expand Down Expand Up @@ -297,7 +297,7 @@ final class CompositeViews private (
): IO[UnscoredSearchResults[ViewResource]] = {
val scope = params.project.fold[Scope](Scope.Root)(ref => Scope.Project(ref))
SearchResults(
log.currentStates(scope, _.toResource).evalFilter(params.matches(_).toTask),
log.currentStates(scope, _.toResource).evalFilter(params.matches),
pagination,
ordering
).span("listCompositeViews")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ final class ElasticSearchViews private (
case Revision(_, rev) => log.stateOr(project, iri, rev, notFound, RevisionNotFound)
case Tag(_, tag) => log.stateOr(project, iri, tag, notFound, TagNotFound(tag))
}
}.toCatsIO
}

/**
* Retrieves a current IndexingElasticSearchView resource.
Expand Down Expand Up @@ -357,7 +357,6 @@ final class ElasticSearchViews private (
log
.evaluate(cmd.project, cmd.id, cmd)
.map(_._2.toResource(defaultElasticsearchMapping, defaultElasticsearchSettings))
.toCatsIO

private def expandWithContext(
fetchCtx: ProjectRef => BIO[ElasticSearchViewRejection, ProjectContext],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ final class Files(
case Revision(_, rev) => log.stateOr(id.project, iri, rev, notFound, RevisionNotFound)
case Tag(_, tag) => log.stateOr(id.project, iri, tag, notFound, TagNotFound(tag))
}
}.toCatsIO
}

private def createLink(
iri: Iri,
Expand All @@ -409,9 +409,9 @@ final class Files(
.adaptError { case e: StorageFileRejection => LinkRejection(fileId, storage.id, e) }

private def eval(cmd: FileCommand): IO[FileResource] =
log.evaluate(cmd.project, cmd.id, cmd).map(_._2.toResource).toCatsIO
log.evaluate(cmd.project, cmd.id, cmd).map(_._2.toResource)

private def test(cmd: FileCommand) = log.dryRun(cmd.project, cmd.id, cmd).toCatsIO
private def test(cmd: FileCommand) = log.dryRun(cmd.project, cmd.id, cmd)

private def fetchActiveStorage(storageIdOpt: Option[IdSegment], ref: ProjectRef, pc: ProjectContext)(implicit
caller: Caller
Expand Down Expand Up @@ -531,7 +531,7 @@ final class Files(

private[files] def updateAttributes(iri: Iri, project: ProjectRef): IO[Unit] =
for {
f <- log.stateOr(project, iri, FileNotFound(iri, project)).toCatsIO
f <- log.stateOr(project, iri, FileNotFound(iri, project))
storage <- storages.fetch(IdSegmentRef(f.storage), f.project).map(_.value).adaptError {
case e: StorageFetchRejection => WrappedStorageRejection(e)
}
Expand All @@ -545,7 +545,7 @@ final class Files(
newAttr <- fetchAttributes(storage, attr, f.id)
mediaType = attr.mediaType orElse Some(newAttr.mediaType)
command = UpdateFileAttributes(f.id, f.project, mediaType, newAttr.bytes, newAttr.digest, f.rev, f.updatedBy)
_ <- log.evaluate(f.project, f.id, command).toCatsIO
_ <- log.evaluate(f.project, f.id, command)
} yield ()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ final class Storages private (
private def fetchDefaults(project: ProjectRef): Stream[IO, StorageResource] =
log
.currentStates(Scope.Project(project), _.toResource)
.translate(taskToIoK)
.filter(_.value.default)

/**
Expand Down Expand Up @@ -328,7 +327,9 @@ final class Storages private (
}

private def eval(cmd: StorageCommand): IO[StorageResource] =
log.evaluate(cmd.project, cmd.id, cmd).toCatsIO.map(_._2.toResource)
log.evaluate(cmd.project, cmd.id, cmd).map { case (_, state) =>
state.toResource
}
}

object Storages {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef, Res
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
import ch.epfl.bluebrain.nexus.delta.sourcing.{EntityCheck, Transactors}
import io.circe.Json
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._

/**
* Aggregates the different [[ResourceShift]] to perform operations on resources independently of their types
Expand Down Expand Up @@ -48,7 +47,7 @@ object ResourceShifts {

override def fetch(reference: ResourceRef, project: ProjectRef): IO[Option[JsonLdContent[_, _]]] =
for {
entityType <- EntityCheck.findType(reference.iri, project, xas).toCatsIO
entityType <- EntityCheck.findType(reference.iri, project, xas)
shift <- entityType.traverse(findShift)
resource <- shift.flatTraverse(_.fetch(reference, project))
} yield resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.sdk.model.search
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.Uri.Query
import cats.Functor
import cats.effect.IO
import cats.syntax.functor._
import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.{contexts, nxv}
Expand All @@ -15,7 +16,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.search.ResultEntry.UnscoredResult
import fs2.Stream
import io.circe.syntax._
import io.circe.{Encoder, Json, JsonObject}
import monix.bio.{Task, UIO}

/**
* Defines the signature for a collection of search results with their metadata including pagination
Expand Down Expand Up @@ -113,11 +113,11 @@ object SearchResults {
UnscoredSearchResults[A](total, results.map(UnscoredResultEntry(_)))

final def apply[A](
stream: Stream[Task, A],
stream: Stream[IO, A],
pagination: Pagination.FromPagination,
ordering: Ordering[A]
): UIO[UnscoredSearchResults[A]] =
stream.compile.toList.hideErrors
): IO[UnscoredSearchResults[A]] =
stream.compile.toList
.map { resources =>
SearchResults(
resources.size.toLong,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ch.epfl.bluebrain.nexus.delta.sdk.organizations
import cats.effect.IO._
import cats.effect.{Clock, ContextShift, IO, Timer}
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent
import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
Expand Down Expand Up @@ -77,8 +76,7 @@ final class OrganizationsImpl private (
SearchResults(
log
.currentStates(_.toResource)
.translate(ioToTaskK)
.evalFilter(params.matches(_).toUIO),
.evalFilter(params.matches),
pagination,
ordering
).span("listOrganizations")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ch.epfl.bluebrain.nexus.delta.sdk.projects
import cats.effect.{Clock, ContextShift, IO, Timer}
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.kamon.KamonMetricComponent
import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
Expand Down Expand Up @@ -98,7 +97,7 @@ final class ProjectsImpl private (
SearchResults(
log
.currentStates(params.organization.fold(Scope.root)(Scope.Org), _.toResource(defaultApiMappings))
.evalFilter(params.matches(_).toUIO),
.evalFilter(params.matches),
pagination,
ordering
).span("listProjects")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ch.epfl.bluebrain.nexus.delta.sdk.projects

import cats.effect.IO
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ProjectRejection.{ProjectDeletionIsDisabled, ProjectIsReferenced}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.EntityDependency.ReferencedBy
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
Expand All @@ -19,7 +18,7 @@ trait ValidateProjectDeletion {
object ValidateProjectDeletion {

def apply(xas: Transactors, enabled: Boolean): ValidateProjectDeletion =
apply(EntityDependencyStore.directExternalReferences(_, xas).toCatsIO, enabled)
apply(EntityDependencyStore.directExternalReferences(_, xas), enabled)

def apply(fetchReferences: ProjectRef => IO[Set[ReferencedBy]], enabled: Boolean): ValidateProjectDeletion =
(project: ProjectRef) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import ch.epfl.bluebrain.nexus.delta.sdk.syntax._
import ch.epfl.bluebrain.nexus.delta.sourcing._
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._

final class RealmsImpl private (log: RealmsLog) extends Realms {

Expand Down Expand Up @@ -70,7 +69,7 @@ final class RealmsImpl private (log: RealmsLog) extends Realms {
ordering: Ordering[RealmResource]
): IO[SearchResults.UnscoredSearchResults[RealmResource]] =
SearchResults(
log.currentStates(_.toResource).translate(ioToTaskK).evalFilter(params.matches(_).toUIO),
log.currentStates(_.toResource).evalFilter(params.matches),
pagination,
ordering
).span("listRealms")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ final class ResolversImpl private (
source: Json
)(implicit caller: Caller): IO[ResolverResource] = {
for {
pc <- fetchContext.onCreate(projectRef)
(iri, resolverValue) <- sourceDecoder(projectRef, pc, source)
pc <- fetchContext.onCreate(projectRef).toCatsIO
(iri, resolverValue) <- sourceDecoder(projectRef, pc, source).toCatsIO
res <- eval(CreateResolver(iri, projectRef, resolverValue, source, caller))
} yield res
}.span("createResolver")
Expand All @@ -54,9 +54,9 @@ final class ResolversImpl private (
source: Json
)(implicit caller: Caller): IO[ResolverResource] = {
for {
pc <- fetchContext.onCreate(projectRef)
iri <- expandIri(id, pc)
resolverValue <- sourceDecoder(projectRef, pc, iri, source)
pc <- fetchContext.onCreate(projectRef).toCatsIO
iri <- expandIri(id, pc).toCatsIO
resolverValue <- sourceDecoder(projectRef, pc, iri, source).toCatsIO
res <- eval(CreateResolver(iri, projectRef, resolverValue, source, caller))
} yield res
}.span("createResolver")
Expand All @@ -67,8 +67,8 @@ final class ResolversImpl private (
resolverValue: ResolverValue
)(implicit caller: Caller): IO[ResolverResource] = {
for {
pc <- fetchContext.onCreate(projectRef)
iri <- expandIri(id, pc)
pc <- fetchContext.onCreate(projectRef).toCatsIO
iri <- expandIri(id, pc).toCatsIO
source = ResolverValue.generateSource(iri, resolverValue)
res <- eval(CreateResolver(iri, projectRef, resolverValue, source, caller))
} yield res
Expand All @@ -81,9 +81,9 @@ final class ResolversImpl private (
source: Json
)(implicit caller: Caller): IO[ResolverResource] = {
for {
pc <- fetchContext.onModify(projectRef)
iri <- expandIri(id, pc)
resolverValue <- sourceDecoder(projectRef, pc, iri, source)
pc <- fetchContext.onModify(projectRef).toCatsIO
iri <- expandIri(id, pc).toCatsIO
resolverValue <- sourceDecoder(projectRef, pc, iri, source).toCatsIO
res <- eval(UpdateResolver(iri, projectRef, resolverValue, source, rev, caller))
} yield res
}.span("updateResolver")
Expand All @@ -97,8 +97,8 @@ final class ResolversImpl private (
caller: Caller
): IO[ResolverResource] = {
for {
pc <- fetchContext.onModify(projectRef)
iri <- expandIri(id, pc)
pc <- fetchContext.onModify(projectRef).toCatsIO
iri <- expandIri(id, pc).toCatsIO
source = ResolverValue.generateSource(iri, resolverValue)
res <- eval(UpdateResolver(iri, projectRef, resolverValue, source, rev, caller))
} yield res
Expand All @@ -114,8 +114,8 @@ final class ResolversImpl private (
subject: Identity.Subject
): IO[ResolverResource] = {
for {
pc <- fetchContext.onModify(projectRef)
iri <- expandIri(id, pc)
pc <- fetchContext.onModify(projectRef).toCatsIO
iri <- expandIri(id, pc).toCatsIO
res <- eval(TagResolver(iri, projectRef, tagRev, tag, rev, subject))
} yield res
}.span("tagResolver")
Expand All @@ -126,16 +126,16 @@ final class ResolversImpl private (
rev: Int
)(implicit subject: Identity.Subject): IO[ResolverResource] = {
for {
pc <- fetchContext.onModify(projectRef)
iri <- expandIri(id, pc)
pc <- fetchContext.onModify(projectRef).toCatsIO
iri <- expandIri(id, pc).toCatsIO
res <- eval(DeprecateResolver(iri, projectRef, rev, subject))
} yield res
}.span("deprecateResolver")

override def fetch(id: IdSegmentRef, projectRef: ProjectRef): IO[ResolverResource] = {
for {
pc <- fetchContext.onRead(projectRef)
iri <- expandIri(id.value, pc)
pc <- fetchContext.onRead(projectRef).toCatsIO
iri <- expandIri(id.value, pc).toCatsIO
notFound = ResolverNotFound(iri, projectRef)
state <- id match {
case Latest(_) => log.stateOr(projectRef, iri, notFound)
Expand All @@ -154,13 +154,13 @@ final class ResolversImpl private (
): IO[UnscoredSearchResults[ResolverResource]] = {
val scope = params.project.fold[Scope](Scope.Root)(ref => Scope.Project(ref))
SearchResults(
log.currentStates(scope, _.toResource).evalFilter(params.matches(_).toUIO),
log.currentStates(scope, _.toResource).evalFilter(params.matches),
pagination,
ordering
).span("listResolvers")
}

private def eval(cmd: ResolverCommand) =
private def eval(cmd: ResolverCommand): IO[ResolverResource] =
log.evaluate(cmd.project, cmd.id, cmd).map(_._2.toResource)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ final class ResourcesImpl private (
for {
(iri, projectContext) <- expandWithContext(fetchContext.onModify, projectRef, id)
schemaRef <- IO.fromEither(expandResourceRef(schema, projectContext))
resource <- log.stateOr(projectRef, iri, ResourceNotFound(iri, projectRef)).toCatsIO
resource <- log.stateOr(projectRef, iri, ResourceNotFound(iri, projectRef))
res <- if (schemaRef.iri == resource.schema.iri) fetch(id, projectRef, schema.some)
else eval(UpdateResourceSchema(iri, projectRef, schemaRef, resource.expanded, resource.rev, caller))
} yield res
Expand All @@ -103,7 +103,7 @@ final class ResourcesImpl private (
for {
(iri, projectContext) <- expandWithContext(fetchContext.onModify, projectRef, id)
schemaRefOpt <- IO.fromEither(expandResourceRef(schemaOpt, projectContext))
resource <- log.stateOr(projectRef, iri, ResourceNotFound(iri, projectRef)).toCatsIO
resource <- log.stateOr(projectRef, iri, ResourceNotFound(iri, projectRef))
jsonld <- sourceParser(projectRef, projectContext, iri, resource.source).toCatsIO
res <- eval(RefreshResource(iri, projectRef, schemaRefOpt, jsonld, resource.rev, caller))
} yield res
Expand Down Expand Up @@ -173,11 +173,11 @@ final class ResourcesImpl private (
} yield state
}.span("fetchResource")

private def stateOrNotFound(id: IdSegmentRef, iri: Iri, ref: ProjectRef): IO[ResourceState] = (id match {
private def stateOrNotFound(id: IdSegmentRef, iri: Iri, ref: ProjectRef): IO[ResourceState] = id match {
case Latest(_) => log.stateOr(ref, iri, notFound(iri, ref))
case Revision(_, rev) => log.stateOr(ref, iri, rev, notFound(iri, ref), RevisionNotFound)
case Tag(_, tag) => log.stateOr(ref, iri, tag, notFound(iri, ref), TagNotFound(tag))
}).toCatsIO
}

private def notFound(iri: Iri, ref: ProjectRef) = ResourceNotFound(iri, ref)

Expand Down
Loading

0 comments on commit c901cb8

Please sign in to comment.