diff --git a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/MultiFetchModule.scala b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/MultiFetchModule.scala index dae425ae9a..daf47c1314 100644 --- a/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/MultiFetchModule.scala +++ b/delta/app/src/main/scala/ch/epfl/bluebrain/nexus/delta/wiring/MultiFetchModule.scala @@ -10,6 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri import ch.epfl.bluebrain.nexus.delta.sdk.multifetch.MultiFetch import ch.epfl.bluebrain.nexus.delta.sdk.multifetch.model.MultiFetchRequest import ch.epfl.bluebrain.nexus.delta.sdk.{PriorityRoute, ResourceShifts} +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import distage.ModuleDef import izumi.distage.model.definition.Id import monix.execution.Scheduler @@ -23,7 +24,7 @@ object MultiFetchModule extends ModuleDef { ) => MultiFetch( aclCheck, - (input: MultiFetchRequest.Input) => shifts.fetch(input.id, input.project) + (input: MultiFetchRequest.Input) => shifts.fetch(input.id, input.project).toUIO ) } diff --git a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/effect/migration/MigrateEffectSyntax.scala b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/effect/migration/MigrateEffectSyntax.scala index c1c4d6d25a..f9c2b1fdba 100644 --- a/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/effect/migration/MigrateEffectSyntax.scala +++ b/delta/kernel/src/main/scala/ch/epfl/bluebrain/nexus/delta/kernel/effect/migration/MigrateEffectSyntax.scala @@ -41,4 +41,6 @@ final class CatsIOToBioOps[A](private val io: IO[A]) extends AnyVal { } def toUIO: UIO[A] = BIO.from(io).hideErrors + + def toTask: Task[A] = Task.from(io) } diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/model/BlazegraphView.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/model/BlazegraphView.scala index 0c4be12686..7f82f3ce15 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/model/BlazegraphView.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/model/BlazegraphView.scala @@ -17,6 +17,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag import io.circe.generic.extras.Configuration import io.circe.generic.extras.semiauto.deriveConfiguredEncoder +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import io.circe.syntax._ import io.circe.{Encoder, Json, JsonObject} @@ -196,7 +197,7 @@ object BlazegraphView { def shift(views: BlazegraphViews)(implicit baseUri: BaseUri): Shift = ResourceShift.withMetadata[BlazegraphViewState, BlazegraphView, Metadata]( BlazegraphViews.entityType, - (ref, project) => views.fetch(IdSegmentRef(ref), project), + (ref, project) => views.fetch(IdSegmentRef(ref), project).toCatsIO, state => state.toResource, value => JsonLdContent(value, value.value.source, Some(value.value.metadata)) ) diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/model/CompositeView.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/model/CompositeView.scala index ccfabdea33..18d68166dd 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/model/CompositeView.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/model/CompositeView.scala @@ -27,6 +27,7 @@ import java.time.Instant import java.util.UUID import scala.annotation.nowarn import scala.concurrent.duration.FiniteDuration +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ /** * Representation of a composite view. @@ -170,7 +171,7 @@ object CompositeView { def shift(views: CompositeViews)(implicit baseUri: BaseUri): Shift = ResourceShift.withMetadata[CompositeViewState, CompositeView, Metadata]( CompositeViews.entityType, - (ref, project) => views.fetch(IdSegmentRef(ref), project), + (ref, project) => views.fetch(IdSegmentRef(ref), project).toCatsIO, state => state.toResource, value => JsonLdContent(value, value.value.source, Some(value.value.metadata)) ) diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala index 3904adacc6..7ae1b1487d 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala @@ -270,7 +270,7 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef { } make[IdResolution].from { (defaultViewsQuery: DefaultViewsQuery.Elasticsearch, shifts: ResourceShifts) => - new IdResolution(defaultViewsQuery, (resourceRef, projectRef) => shifts.fetch(resourceRef, projectRef)) + new IdResolution(defaultViewsQuery, (resourceRef, projectRef) => shifts.fetch(resourceRef, projectRef).toUIO) } make[IdResolutionRoutes].from { diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/model/ElasticSearchView.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/model/ElasticSearchView.scala index fde5b2d5c2..a6b146f46c 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/model/ElasticSearchView.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/model/ElasticSearchView.scala @@ -26,6 +26,7 @@ import io.circe.parser.parse import io.circe.syntax._ import io.circe.{Encoder, Json, JsonObject} import monix.bio.IO +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import java.util.UUID import scala.annotation.nowarn @@ -276,7 +277,7 @@ object ElasticSearchView { ): Shift = ResourceShift.withMetadata[ElasticSearchViewState, ElasticSearchView, Metadata]( ElasticSearchViews.entityType, - (ref, project) => views.fetch(IdSegmentRef(ref), project), + (ref, project) => views.fetch(IdSegmentRef(ref), project).toCatsIO, state => state.toResource(defaultMapping, defaultSettings), value => JsonLdContent(value, value.value.source, Some(value.value.metadata)) ) diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/model/File.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/model/File.scala index a9d21f2fc4..758baec542 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/model/File.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/model/File.scala @@ -16,6 +16,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.ResourceRef import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag import io.circe.syntax._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import io.circe.{Encoder, Json} /** @@ -74,7 +75,7 @@ object File { def shift(files: Files)(implicit baseUri: BaseUri, config: StorageTypeConfig): Shift = ResourceShift.withMetadata[FileState, File, Metadata]( Files.entityType, - (ref, project) => files.fetch(IdSegmentRef(ref), project), + (ref, project) => files.fetch(IdSegmentRef(ref), project).toCatsIO, state => state.toResource, value => JsonLdContent(value, value.value.asJson, Some(value.value.metadata)) ) diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/Storage.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/Storage.scala index 35a256462b..f249a3344c 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/Storage.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/model/Storage.scala @@ -14,6 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.ContextValue import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.JsonLdContext.keywords import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.JsonLdContent import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, IdSegmentRef, Tags} import ch.epfl.bluebrain.nexus.delta.sdk.{OrderingFields, ResourceShift} @@ -173,7 +174,7 @@ object Storage { def shift(storages: Storages)(implicit baseUri: BaseUri): Shift = ResourceShift.withMetadata[StorageState, Storage, Metadata]( Storages.entityType, - (ref, project) => storages.fetch(IdSegmentRef(ref), project), + (ref, project) => storages.fetch(IdSegmentRef(ref), project).toCatsIO, state => state.toResource, value => JsonLdContent(value, value.value.source, Some(value.value.metadata)) ) diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/IndexingAction.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/IndexingAction.scala index 2682ad24b2..fc4548fa84 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/IndexingAction.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/IndexingAction.scala @@ -98,7 +98,7 @@ object IndexingAction { for { _ <- logger.debug(s"Synchronous indexing of resource '$project/${res.id}' has been requested.") // We create the GraphResource wrapped in an `Elem` - elem <- toCatsIO(shift.toGraphResourceElem(project, res)) + elem <- shift.toGraphResourceElem(project, res) errorsPerAction <- internal.traverse(_.apply(project, elem)) errors = errorsPerAction.toList.flatMap(_.map(_.throwable)) _ <- IO.raiseWhen(errors.nonEmpty)(IndexingFailed(res.void, errors)) diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ResourceShift.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ResourceShift.scala index 875bf2f102..47f36b9125 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ResourceShift.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ResourceShift.scala @@ -1,5 +1,6 @@ package ch.epfl.bluebrain.nexus.delta.sdk +import cats.effect.IO import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi} @@ -9,6 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.JsonLdContent import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, ResourceF} import ch.epfl.bluebrain.nexus.delta.sdk.syntax._ +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import ch.epfl.bluebrain.nexus.delta.sourcing.Serializer import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef, ResourceRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset @@ -17,7 +19,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.state.State.ScopedState import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{FailedElem, SuccessElem} import io.circe.Json -import monix.bio.{IO, Task, UIO} /** * Defines common operations to retrieve the different resources in a common format for tasks like indexing or @@ -44,7 +45,7 @@ import monix.bio.{IO, Task, UIO} */ abstract class ResourceShift[State <: ScopedState, A, M]( val entityType: EntityType, - fetchResource: (ResourceRef, ProjectRef) => UIO[Option[ResourceF[A]]], + fetchResource: (ResourceRef, ProjectRef) => IO[Option[ResourceF[A]]], valueEncoder: JsonLdEncoder[A], metadataEncoder: Option[JsonLdEncoder[M]] )(implicit serializer: Serializer[_, State], baseUri: BaseUri) { @@ -61,7 +62,7 @@ abstract class ResourceShift[State <: ScopedState, A, M]( * @return * the resource with its original source, its metadata and its encoder */ - def fetch(reference: ResourceRef, project: ProjectRef): UIO[Option[JsonLdContent[A, M]]] = + def fetch(reference: ResourceRef, project: ProjectRef): IO[Option[JsonLdContent[A, M]]] = fetchResource(reference, project).map(_.map(resourceToContent)) protected def resourceToContent(value: ResourceF[A]): JsonLdContent[A, M] @@ -71,30 +72,30 @@ abstract class ResourceShift[State <: ScopedState, A, M]( */ def toGraphResource(json: Json)(implicit cr: RemoteContextResolution - ): Task[GraphResource] = + ): IO[GraphResource] = for { - state <- Task.fromEither(serializer.codec.decodeJson(json)) + state <- IO.fromEither(serializer.codec.decodeJson(json)) resource = toResourceF(state) graph <- toGraphResource(state.project, resource) } yield graph def toGraphResourceElem(project: ProjectRef, resource: ResourceF[A])(implicit cr: RemoteContextResolution - ): UIO[Elem[GraphResource]] = toGraphResource(project, resource).redeem( + ): IO[Elem[GraphResource]] = toGraphResource(project, resource).redeem( err => FailedElem(entityType, resource.id, Some(project), resource.updatedAt, Offset.Start, err, resource.rev), graph => SuccessElem(entityType, resource.id, Some(project), resource.updatedAt, Offset.Start, graph, resource.rev) ) private def toGraphResource(project: ProjectRef, resource: ResourceF[A])(implicit cr: RemoteContextResolution - ): Task[GraphResource] = { + ): IO[GraphResource] = { val content = resourceToContent(resource) val metadata = content.metadata val id = resource.resolvedId for { - graph <- valueJsonLdEncoder.graph(resource.value) + graph <- valueJsonLdEncoder.graph(resource.value).toCatsIO rootGraph = graph.replaceRootNode(id) - resourceMetaGraph <- resourceFJsonLdEncoder.graph(resource.void) + resourceMetaGraph <- resourceFJsonLdEncoder.graph(resource.void).toCatsIO metaGraph <- encodeMetadata(id, metadata) rootMetaGraph = metaGraph.fold(resourceMetaGraph)(_ ++ resourceMetaGraph) typesGraph = rootMetaGraph.rootTypesGraph @@ -115,8 +116,8 @@ abstract class ResourceShift[State <: ScopedState, A, M]( private def encodeMetadata(id: Iri, metadata: Option[M])(implicit cr: RemoteContextResolution) = (metadata, metadataEncoder) match { - case (Some(m), Some(e)) => e.graph(m).map { g => Some(g.replaceRootNode(id)) } - case (_, _) => UIO.none + case (Some(m), Some(e)) => e.graph(m).toCatsIO.map { g => Some(g.replaceRootNode(id)) } + case (_, _) => IO.none } } @@ -142,7 +143,7 @@ object ResourceShift { */ def withMetadata[State <: ScopedState, A, M]( entityType: EntityType, - fetchResource: (ResourceRef, ProjectRef) => IO[_, ResourceF[A]], + fetchResource: (ResourceRef, ProjectRef) => IO[ResourceF[A]], stateToResource: State => ResourceF[A], asContent: ResourceF[A] => JsonLdContent[A, M] )(implicit @@ -180,7 +181,7 @@ object ResourceShift { */ def apply[State <: ScopedState, B]( entityType: EntityType, - fetchResource: (ResourceRef, ProjectRef) => IO[_, ResourceF[B]], + fetchResource: (ResourceRef, ProjectRef) => IO[ResourceF[B]], stateToResource: State => ResourceF[B], asContent: ResourceF[B] => JsonLdContent[B, Nothing] )(implicit diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ResourceShifts.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ResourceShifts.scala index 95762a3a61..b7cf43143b 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ResourceShifts.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/ResourceShifts.scala @@ -1,15 +1,16 @@ package ch.epfl.bluebrain.nexus.delta.sdk +import cats.effect.IO import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.JsonLdContent import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef, ResourceRef} import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource import ch.epfl.bluebrain.nexus.delta.sourcing.{EntityCheck, Transactors} -import com.typesafe.scalalogging.Logger import io.circe.Json -import monix.bio.{IO, Task, UIO} +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ /** * Aggregates the different [[ResourceShift]] to perform operations on resources independently of their types @@ -19,18 +20,18 @@ trait ResourceShifts { /** * Fetch a resource as a [[JsonLdContent]] */ - def fetch(reference: ResourceRef, project: ProjectRef): UIO[Option[JsonLdContent[_, _]]] + def fetch(reference: ResourceRef, project: ProjectRef): IO[Option[JsonLdContent[_, _]]] /** * Return a function to decode a json to a [[GraphResource]] according to its [[EntityType]] */ - def decodeGraphResource: (EntityType, Json) => Task[GraphResource] + def decodeGraphResource: (EntityType, Json) => IO[GraphResource] } object ResourceShifts { - private val logger: Logger = Logger[ResourceShifts] + private val logger = Logger.cats[ResourceShifts] private case class NoShiftAvailable(entityType: EntityType) extends Exception(s"No shift is available for entity type $entityType") @@ -40,29 +41,27 @@ object ResourceShifts { ): ResourceShifts = new ResourceShifts { private val shiftsMap = shifts.map { encoder => encoder.entityType -> encoder }.toMap - private def findShift(entityType: EntityType): UIO[ResourceShift[_, _, _]] = IO - .fromOption( - shiftsMap.get(entityType), + private def findShift(entityType: EntityType): IO[ResourceShift[_, _, _]] = IO + .fromOption(shiftsMap.get(entityType))( NoShiftAvailable(entityType) ) - .hideErrors - override def fetch(reference: ResourceRef, project: ProjectRef): UIO[Option[JsonLdContent[_, _]]] = + override def fetch(reference: ResourceRef, project: ProjectRef): IO[Option[JsonLdContent[_, _]]] = for { - entityType <- EntityCheck.findType(reference.iri, project, xas) + entityType <- EntityCheck.findType(reference.iri, project, xas).toCatsIO shift <- entityType.traverse(findShift) resource <- shift.flatTraverse(_.fetch(reference, project)) } yield resource - override def decodeGraphResource: (EntityType, Json) => Task[GraphResource] = { + override def decodeGraphResource: (EntityType, Json) => IO[GraphResource] = { (entityType: EntityType, json: Json) => { for { shift <- findShift(entityType) result <- shift.toGraphResource(json) } yield result - }.tapError { err => - UIO.delay(logger.error(s"Entity of type '$entityType' could not be decoded as a graph resource", err)) + }.onError { err => + logger.error(err)(s"Entity of type '$entityType' could not be decoded as a graph resource") } } } diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/model/Project.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/model/Project.scala index 7b71a549a7..e98959c98c 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/model/Project.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/model/Project.scala @@ -10,6 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.Projects import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.Project.{Metadata, Source} import ch.epfl.bluebrain.nexus.delta.sdk.{OrderingFields, ResourceShift} import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import io.circe.Encoder import io.circe.generic.extras.Configuration import io.circe.generic.extras.semiauto.deriveConfiguredEncoder @@ -163,7 +164,7 @@ object Project { def shift(projects: Projects, defaultMappings: ApiMappings)(implicit baseUri: BaseUri): Shift = ResourceShift.withMetadata[ProjectState, Project, Metadata]( Projects.entityType, - (_, ref) => projects.fetch(ref), + (_, ref) => projects.fetch(ref).toCatsIO, state => state.toResource(defaultMappings), value => JsonLdContent(value, value.value.asJson, Some(value.value.metadata)) ) diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/stream/GraphResourceStream.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/stream/GraphResourceStream.scala index 9ed7e1d5dd..5ada289f74 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/stream/GraphResourceStream.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/stream/GraphResourceStream.scala @@ -8,6 +8,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.query.{RefreshStrategy, SelectFilter, StreamingQuery} import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource import ch.epfl.bluebrain.nexus.delta.sourcing.stream.RemainingElems +import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._ import fs2.Stream import monix.bio.{Task, UIO} @@ -77,7 +78,7 @@ object GraphResourceStream { ): GraphResourceStream = new GraphResourceStream { override def continuous(project: ProjectRef, selectFilter: SelectFilter, start: Offset): ElemStream[GraphResource] = - StreamingQuery.elems(project, start, selectFilter, qc, xas, shifts.decodeGraphResource) + StreamingQuery.elems(project, start, selectFilter, qc, xas, shifts.decodeGraphResource(_, _).toTask) override def currents(project: ProjectRef, selectFilter: SelectFilter, start: Offset): ElemStream[GraphResource] = StreamingQuery.elems( @@ -86,7 +87,7 @@ object GraphResourceStream { selectFilter, qc.copy(refreshStrategy = RefreshStrategy.Stop), xas, - shifts.decodeGraphResource + shifts.decodeGraphResource(_, _).toTask ) override def remaining(