Skip to content

Commit

Permalink
Update to Scala 2.13.15 (#5156)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Sep 27, 2024
1 parent 772ebfb commit 1531189
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 94 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ scalafmt: {
}
*/

val scalacScapegoatVersion = "2.1.6"
val scalaCompilerVersion = "2.13.14"
val scalacScapegoatVersion = "3.0.2"
val scalaCompilerVersion = "2.13.15"

val akkaHttpVersion = "10.2.10"
val akkaHttpCirceVersion = "1.39.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,7 @@ object CompositeViewDef {
}

/**
* Complete and compiles the operation for the branch applying a leap depending on the current offset and the final
* operation
* Complete and compiles the operation for the branch
*
* @param progress
* the composite progress
Expand All @@ -491,7 +490,6 @@ object CompositeViewDef {
operation: Operation,
closeBranch: (CompositeBranch, ProjectionProgress) => Operation
): Either[ProjectionErr, Operation] = {
//TODO Add leap on target
val branchProgress = progress.branches.get(branch)
Operation
.merge(operation, closeBranch(branch, branchProgress.getOrElse(ProjectionProgress.NoProgress)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import cats.effect.IO
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ElemPipe
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, FailedElem, SuccessElem}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr.{LeapingNotAllowedErr, OperationInOutMatchErr}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr.OperationInOutMatchErr
import fs2.{Chunk, Pipe, Pull, Stream}
import shapeless.Typeable

Expand Down Expand Up @@ -110,60 +109,6 @@ sealed trait Operation { self =>
_.through(self.asFs2).debug(formatter, logger)

}

/**
* Do not apply the operation until the given offset is reached
* @param offset
* the offset to reach before applying the operation
* @param mapSkip
* the function to apply when skipping an element
*/
def leap[A](offset: Offset, mapSkip: self.In => A)(implicit ta: Typeable[A]): Either[ProjectionErr, Operation] = {
val pipe = new Operation {
override type In = self.In
override type Out = self.Out

override def name: String = "Leap"

override def inType: Typeable[In] = self.inType

override def outType: Typeable[Out] = self.outType

override protected[stream] def asFs2: Pipe[IO, Elem[Operation.this.In], Elem[this.Out]] = {
def go(s: fs2.Stream[IO, Elem[In]]): Pull[IO, Elem[this.Out], Unit] = {
s.pull.peek.flatMap {
case Some((chunk, stream)) =>
val (before, after) = chunk.partitionEither { e =>
Either.cond(Offset.offsetOrder.gt(e.offset, offset), e, e)
}
for {
evaluated <- Pull.eval(Stream.chunk(after).through(self.asFs2).compile.to(Chunk))
all = Chunk.concat(Seq(before.map(_.map(mapSkip)), evaluated)).map {
_.attempt { value => this.outType.cast(value).toRight(LeapingNotAllowedErr(self, ta)) }
}
_ <- Pull.output(all)
next <- go(stream.tail)
} yield next
case None => Pull.done
}
}
in => go(in).stream
}
}

Either.cond(
ta.describe == self.outType.describe,
pipe,
LeapingNotAllowedErr(self, ta)
)
}

/**
* Leap applying the identity function to skipped elements
* @param offset
* the offset to reach before applying the operation
*/
def identityLeap(offset: Offset): Either[ProjectionErr, Operation] = leap(offset, identity[self.In])(inType)
}

object Operation {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.sourcing.stream

import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.ExpandedJsonLd
import shapeless.Typeable

/**
* Enumeration of recoverable errors for projection/stream definitions and compositions.
Expand Down Expand Up @@ -81,14 +80,6 @@ object ProjectionErr {
s"Unable to match Out type '${self.outType.describe}' of operation '${self.name}' to the In type '${that.inType.describe}' of operation '${that.name}'"
}

/**
* Leaping is only possible for an operation when we provide a skip function that aligns to the out type
*/
final case class LeapingNotAllowedErr[A](self: Operation, skip: Typeable[A]) extends ProjectionErr {
override def reason: String =
s"Unable to leap on operation '${self.name}' as skip type '${skip.describe}' does not match Out type '${self.outType.describe}'."
}

/**
* A pipe definition can be looked up in the [[ReferenceRegistry]] using a reference. This error signals a failed
* lookup attempt.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Pipe
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.OperationSuite.{double, half, until}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr.{LeapingNotAllowedErr, OperationInOutMatchErr}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionErr.OperationInOutMatchErr
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.GenericPipe
import ch.epfl.bluebrain.nexus.testkit.mu.NexusSuite
import fs2.Stream
import shapeless.Typeable

import java.time.Instant

Expand Down Expand Up @@ -65,26 +64,6 @@ class OperationSuite extends NexusSuite {
_ = assertEquals(sink2.successes.values.toList.sorted, List(0, 1, 2, 3, 4))
} yield ()
}

test("Run the double stream with a leaped part") {
val sink = CacheSink.states[Int]

val first = double.identityLeap(Offset.at(2L)).rightValue
val second = Operation.merge(double, sink).rightValue
val all = Operation.merge(first, second).rightValue

for {
_ <- until(5).through(all).rightValue.apply(Offset.Start).assertSize(5)
_ = assert(sink.failed.isEmpty, "No failure should be detected.")
_ = assert(sink.dropped.isEmpty, "No dropped should be detected.")
_ = assertEquals(sink.successes.values.toList.sorted, List(0, 2, 4, 12, 16))
} yield ()
}

test("Leaping is not possible for an operation where in and out are not aligned") {
half.identityLeap(Offset.at(2L)).assertLeft(LeapingNotAllowedErr(half, Typeable[Int]))
}

}

object OperationSuite {
Expand Down
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.4.2")

addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.1.1")
addSbtPlugin("com.sksamuel.scapegoat" % "sbt-scapegoat" % "1.2.4")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.2.1")
addSbtPlugin("com.sksamuel.scapegoat" % "sbt-scapegoat" % "1.2.5")
addSbtPlugin("com.github.cb372" % "sbt-explicit-dependencies" % "0.3.1")
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.6.3")

Expand Down

0 comments on commit 1531189

Please sign in to comment.