Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Envelope with Elem.SuccessElem #4629

Merged
merged 11 commits into from
Jan 12, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object BlazegraphDeletionTask {

def apply(views: BlazegraphViews) =
new BlazegraphDeletionTask(
project => views.currentIndexingViews(project).evalMapFilter(_.toIO),
project => views.currentIndexingViews(project).map(_.value),
(v: ActiveViewDef, subject: Subject) =>
views.internalDeprecate(v.ref.viewId, v.ref.project, v.rev)(subject).handleErrorWith { r =>
logger.error(s"Deprecating '$v' resulted in error: '$r'.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.{BlazegraphSink
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef, Tag}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef, SuccessElemStream, Tag}
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
Expand All @@ -27,7 +27,7 @@ import scala.concurrent.duration.FiniteDuration
* a maximum duration for the indexing
*/
final class BlazegraphIndexingAction(
fetchCurrentViews: ProjectRef => ElemStream[IndexingViewDef],
fetchCurrentViews: ProjectRef => SuccessElemStream[IndexingViewDef],
compilePipeChain: PipeChain => Either[ProjectionErr, Operation],
sink: ActiveViewDef => Sink,
override val timeout: FiniteDuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.model._
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem
import io.circe.Json

import java.util.UUID
Expand Down Expand Up @@ -318,29 +319,21 @@ final class BlazegraphViews(
/**
* Return the existing indexing views in a project in a finite stream
*/
def currentIndexingViews(project: ProjectRef): ElemStream[IndexingViewDef] =
log.currentStates(Scope.Project(project)).evalMapFilter { envelope =>
IO.pure(toIndexViewDef(envelope))
}

/**
* Return all existing indexing views in a finite stream
*/
def currentIndexingViews: ElemStream[IndexingViewDef] =
log.currentStates(Scope.Root).evalMapFilter { envelope =>
IO.pure(toIndexViewDef(envelope))
def currentIndexingViews(project: ProjectRef): SuccessElemStream[IndexingViewDef] =
log.currentStates(Scope.Project(project)).evalMapFilter { elem =>
IO.pure(toIndexViewDef(elem))
}

/**
* Return the indexing views in a non-ending stream
*/
def indexingViews(start: Offset): ElemStream[IndexingViewDef] =
log.states(Scope.Root, start).evalMapFilter { envelope =>
IO.pure(toIndexViewDef(envelope))
def indexingViews(start: Offset): SuccessElemStream[IndexingViewDef] =
log.states(Scope.Root, start).evalMapFilter { elem =>
IO.pure(toIndexViewDef(elem))
}

private def toIndexViewDef(envelope: Envelope[BlazegraphViewState]) =
envelope.toElem { v => Some(v.project) }.traverse { v =>
private def toIndexViewDef(elem: Elem.SuccessElem[BlazegraphViewState]) =
elem.withProject(elem.value.project).traverse { v =>
Copy link
Contributor Author

@dantb dantb Jan 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't feel great that this is optional - most of the concrete types for which we're querying elems have a project and it's a mandatory field in the scoped tables. I guess it's because Envelope was used for both global and scoped?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's confusing how the elem wouldn't already have the project set since it's coming from within it..?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to populate this field from the original SQL query.
Yes, it was because Envelope and Elem were designed to work for both global and scoped entities.
Not sure if there is still a use case for global entities though now that we got rid of the sse events for them.

IndexingViewDef(v, prefix)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing.IndexingViewDef
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ElemStream
import ch.epfl.bluebrain.nexus.delta.sourcing.model.SuccessElemStream
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
Expand Down Expand Up @@ -41,7 +41,7 @@ object BlazegraphCoordinator {
* the general supervisor
*/
final private class Active(
fetchViews: Offset => ElemStream[IndexingViewDef],
fetchViews: Offset => SuccessElemStream[IndexingViewDef],
graphStream: GraphResourceStream,
compilePipeChain: PipeChain => Either[ProjectionErr, Operation],
cache: LocalCache[ViewRef, ActiveViewDef],
Expand Down Expand Up @@ -145,7 +145,7 @@ object BlazegraphCoordinator {
}

def apply(
fetchViews: Offset => ElemStream[IndexingViewDef],
fetchViews: Offset => SuccessElemStream[IndexingViewDef],
graphStream: GraphResourceStream,
compilePipeChain: PipeChain => Either[ProjectionErr, Operation],
supervisor: Supervisor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestState
import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestState.PullRequestActive
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, SuccessElemStream}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem}
Expand Down Expand Up @@ -75,9 +75,7 @@ class BlazegraphIndexingActionSuite extends NexusSuite with Fixtures {
ViewRef(project, id4)
)

private val id5 = nxv + "view5"

private def viewStream: ElemStream[IndexingViewDef] =
private def viewStream: SuccessElemStream[IndexingViewDef] =
Stream(
SuccessElem(
tpe = BlazegraphViews.entityType,
Expand Down Expand Up @@ -114,14 +112,6 @@ class BlazegraphIndexingActionSuite extends NexusSuite with Fixtures {
offset = Offset.at(4L),
value = view4,
rev = 1
),
DroppedElem(
tpe = BlazegraphViews.entityType,
id = id5,
project = Some(project),
Instant.EPOCH,
Offset.at(5L),
rev = 1
)
)

Expand Down Expand Up @@ -159,7 +149,7 @@ class BlazegraphIndexingActionSuite extends NexusSuite with Fixtures {
)

test("Collect only the adequate views") {
val expected = IdAcc(Set(id1), Set(id2, id4, id5), Set(id3))
val expected = IdAcc(Set(id1), Set(id2, id4), Set(id3))

indexingAction
.projections(project, elem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.ExpandedJsonLd
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, SuccessElemStream}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.query.SelectFilter
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr.CouldNotFindPipeErr
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.SupervisorSetup.unapply
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
Expand Down Expand Up @@ -65,7 +65,7 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture
ViewRef(project, id3),
projection = id3.toString,
SelectFilter.latest,
Some(PipeChain(PipeRef.unsafe("xxx") -> ExpandedJsonLd.empty)),
Some(PipeChain(unknownPipe -> ExpandedJsonLd.empty)),
namespace = "view3",
indexingRev,
rev
Expand All @@ -86,8 +86,8 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture

private val resumeSignal = SignallingRef[IO, Boolean](false).unsafeRunSync()

// Streams 4 elements until signal is set to true and then a failed item, 1 updated view and 1 deprecated view
private def viewStream: ElemStream[IndexingViewDef] =
// Streams 3 elements until signal is set to true, then 1 updated view and 1 deprecated view
private def viewStream: SuccessElemStream[IndexingViewDef] =
Stream(
SuccessElem(
tpe = BlazegraphViews.entityType,
Expand All @@ -98,20 +98,12 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture
value = view1,
rev = 1
),
DroppedElem(
tpe = BlazegraphViews.entityType,
id = nxv + "dropped",
project = Some(project),
Instant.EPOCH,
Offset.at(2L),
rev = 1
),
SuccessElem(
tpe = BlazegraphViews.entityType,
id = view2.ref.viewId,
project = Some(project),
instant = Instant.EPOCH,
offset = Offset.at(3L),
offset = Offset.at(2L),
value = view2,
rev = 1
),
Expand All @@ -120,26 +112,17 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture
id = view3.ref.viewId,
project = Some(project),
instant = Instant.EPOCH,
offset = Offset.at(4L),
offset = Offset.at(3L),
value = view3,
rev = 1
)
) ++ Stream.never[IO].interruptWhen(resumeSignal) ++ Stream(
FailedElem(
tpe = BlazegraphViews.entityType,
id = nxv + "failed_coord",
project = Some(project),
Instant.EPOCH,
Offset.at(5L),
new IllegalStateException("Something got wrong :("),
rev = 1
),
SuccessElem(
tpe = BlazegraphViews.entityType,
id = deprecatedView1.ref.viewId,
project = Some(project),
instant = Instant.EPOCH,
offset = Offset.at(6L),
offset = Offset.at(4L),
value = deprecatedView1,
rev = 1
),
Expand All @@ -148,17 +131,17 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture
id = updatedView2.ref.viewId,
project = Some(project),
instant = Instant.EPOCH,
offset = Offset.at(7L),
offset = Offset.at(5L),
value = updatedView2,
rev = 1
),
// Elem at offset 8 represents a view update that does not require reindexing
// Elem at offset 6 represents a view update that does not require reindexing
SuccessElem(
tpe = BlazegraphViews.entityType,
id = updatedView2.ref.viewId,
project = Some(project),
instant = Instant.EPOCH,
offset = Offset.at(8L),
offset = Offset.at(6L),
value = updatedView2,
rev = 1
)
Expand Down Expand Up @@ -187,7 +170,7 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture
)
_ <- sv.describe(BlazegraphCoordinator.metadata.name)
.map(_.map(_.progress))
.assertEquals(Some(ProjectionProgress(Offset.at(4L), Instant.EPOCH, 4, 1, 1)))
.assertEquals(Some(ProjectionProgress(Offset.at(3L), Instant.EPOCH, 3, 0, 1)))
.eventually
} yield ()
}
Expand Down Expand Up @@ -268,7 +251,7 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture
_ <- resumeSignal.set(true)
_ <- sv.describe(BlazegraphCoordinator.metadata.name)
.map(_.map(_.progress))
.assertEquals(Some(ProjectionProgress(Offset.at(8L), Instant.EPOCH, 8, 1, 2)))
.assertEquals(Some(ProjectionProgress(Offset.at(6L), Instant.EPOCH, 6, 0, 1)))
.eventually
} yield ()
}
Expand Down Expand Up @@ -311,17 +294,9 @@ class BlazegraphCoordinatorSuite extends NexusSuite with SupervisorSetup.Fixture
} yield ()
}

test("Coordinator projection should have one error after failed elem offset 3") {
for {
entries <- projectionErrors.failedElemEntries(BlazegraphCoordinator.metadata.name, Offset.At(3L)).compile.toList
r = entries.assertOneElem
_ = assertEquals(r.failedElemData.id, nxv + "failed_coord")
} yield ()
}

test("View 2_2 projection should have one error after failed elem offset 3") {
test("View 2_2 projection should have one error after failed elem offset 2") {
for {
entries <- projectionErrors.failedElemEntries(updatedView2.projection, Offset.At(3L)).compile.toList
entries <- projectionErrors.failedElemEntries(updatedView2.projection, Offset.At(2L)).compile.toList
r = entries.assertOneElem
_ = assertEquals(r.failedElemData.id, nxv + "failed")
} yield ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.model._
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem
import io.circe.Json

/**
Expand Down Expand Up @@ -332,20 +333,14 @@ final class CompositeViews private (
def currentViews(project: ProjectRef): ElemStream[CompositeViewDef] =
log.currentStates(Scope.Project(project)).map(toCompositeViewDef)

/**
* Return all existing indexing views in a finite stream
*/
def currentViews: ElemStream[CompositeViewDef] =
log.currentStates(Scope.Root).map(toCompositeViewDef)

/**
* Return the indexing views in a non-ending stream
*/
def views(start: Offset): ElemStream[CompositeViewDef] =
log.states(Scope.Root, start).map(toCompositeViewDef)

private def toCompositeViewDef(envelope: Envelope[CompositeViewState]) =
envelope.toElem { v => Some(v.project) }.map { v =>
private def toCompositeViewDef(elem: Elem.SuccessElem[CompositeViewState]) =
elem.withProject(elem.value.project).mapValue { v =>
CompositeViewDef(v)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.{ElasticSear
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.IndexingAction
import ch.epfl.bluebrain.nexus.delta.sourcing.config.BatchConfig
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef, Tag}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef, SuccessElemStream, Tag}
import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
Expand All @@ -28,7 +28,7 @@ import scala.concurrent.duration.FiniteDuration
* a maximum duration for the indexing
*/
final class ElasticSearchIndexingAction(
fetchCurrentViews: ProjectRef => ElemStream[IndexingViewDef],
fetchCurrentViews: ProjectRef => SuccessElemStream[IndexingViewDef],
compilePipeChain: PipeChain => Either[ProjectionErr, Operation],
sink: ActiveViewDef => Sink,
override val timeout: FiniteDuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.model._
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem
import io.circe.Json

import java.util.UUID
Expand Down Expand Up @@ -367,35 +368,35 @@ final class ElasticSearchViews private (
/**
* Return the existing indexing views in a project in a finite stream
*/
def currentIndexingViews(project: ProjectRef): ElemStream[IndexingViewDef] =
def currentIndexingViews(project: ProjectRef): SuccessElemStream[IndexingViewDef] =
log
.currentStates(Scope.Project(project))
.evalMapFilter { envelope =>
IO.pure(toIndexViewDef(envelope))
.evalMapFilter { elem =>
IO.pure(toIndexViewDef(elem))
}

/**
* Return all existing indexing views in a finite stream
*/
def currentIndexingViews: ElemStream[IndexingViewDef] =
def currentIndexingViews: SuccessElemStream[IndexingViewDef] =
log
.currentStates(Scope.Root)
.evalMapFilter { envelope =>
IO.pure(toIndexViewDef(envelope))
.evalMapFilter { elem =>
IO.pure(toIndexViewDef(elem))
}

/**
* Return the indexing views in a non-ending stream
*/
def indexingViews(start: Offset): ElemStream[IndexingViewDef] =
def indexingViews(start: Offset): SuccessElemStream[IndexingViewDef] =
log
.states(Scope.Root, start)
.evalMapFilter { envelope =>
IO.pure(toIndexViewDef(envelope))
.evalMapFilter { elem =>
IO.pure(toIndexViewDef(elem))
}

private def toIndexViewDef(envelope: Envelope[ElasticSearchViewState]) =
envelope.toElem { v => Some(v.project) }.traverse { v =>
private def toIndexViewDef(elem: Elem.SuccessElem[ElasticSearchViewState]) =
elem.withProject(elem.value.project).traverse { v =>
IndexingViewDef(v, defaultElasticsearchMapping, defaultElasticsearchSettings, prefix)
}

Expand Down
Loading