Skip to content

Commit 14227dc

Browse files
Merge pull request #16 from precog/reactivestreams-to-flow
reactivestreams to flow
2 parents b33b985 + 9a70664 commit 14227dc

File tree

11 files changed

+36
-32
lines changed

11 files changed

+36
-32
lines changed

.github/workflows/ci.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ jobs:
2222
strategy:
2323
matrix:
2424
os: [ubuntu-latest]
25-
scala: [2.12.14, 2.13.7]
25+
scala: [2.12.14, 2.13.12]
2626
2727
runs-on: ${{ matrix.os }}
2828
steps:

build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import ReleaseTransformations._
33
import microsites.CdnDirectives
44

55
lazy val scala212 = "2.12.14"
6-
lazy val scala213 = "2.13.7"
6+
lazy val scala213 = "2.13.12"
77
lazy val supportedScalaVersions = List(scala212, scala213)
88

99
ThisBuild / scalaVersion := scala213

circe/src/main/scala/mongo4cats/circe.scala

+5-3
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ object circe extends JsonCodecs {
3939
}
4040

4141
object implicits {
42-
implicit def circeEncoderToEncoder[A: Encoder] = new BsonEncoder[A] {
42+
implicit def circeEncoderToEncoder[A: Encoder]: BsonEncoder[A] = new BsonEncoder[A] {
4343
def apply(a: A): BsonValue = {
4444
val json = a.asJson
4545
val wrapped = Json.obj(RootTag := json)
@@ -48,12 +48,14 @@ object circe extends JsonCodecs {
4848
}
4949
}
5050

51-
implicit def circeDecoderToDecoder[A: Decoder] = new BsonDecoder[A] {
51+
implicit def circeDecoderToDecoder[A: Decoder]: BsonDecoder[A] = new BsonDecoder[A] {
52+
53+
val decoder = Decoder.instance[A](_.as[A])
54+
5255
def apply(b: BsonValue) = {
5356
val doc = BsonDocument(RootTag -> (if (b == null) new BsonNull else b)).toJson()
5457
val json = parser.parse(doc)
5558
val jsonWithoutRoot = json.flatMap(_.hcursor.get[Json](RootTag))
56-
val decoder = Decoder.instance[A](_.as[A])
5759
jsonWithoutRoot
5860
.flatMap(decoder.decodeJson(_))
5961
.leftMap(x =>

circe/src/test/scala/mongo4cats/MongoCollectionSpec.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,14 @@ import java.time.{Instant, LocalDate}
3434
import java.time.temporal.ChronoField.MILLI_OF_SECOND
3535
import java.time.temporal.ChronoUnit
3636
import scala.concurrent.Future
37+
import mongo4cats.bson.BsonDocumentEncoder
3738

3839
class MongoCollectionSpec extends AsyncWordSpec with Matchers with EmbeddedMongo {
3940

4041
import MongoCollectionSpec._
4142

42-
implicit val personEnc = unsafe.circeDocumentEncoder[Person]
43-
implicit val paymentEnc = unsafe.circeDocumentEncoder[Payment]
43+
implicit val personEnc: BsonDocumentEncoder[Person] = unsafe.circeDocumentEncoder[Person]
44+
implicit val paymentEnc: BsonDocumentEncoder[Payment] = unsafe.circeDocumentEncoder[Payment]
4445

4546
override val mongoPort: Int = 12348
4647

circe/src/test/scala/mongo4cats/circe.scala

+8-7
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,21 @@ class CirceSpec extends AnyWordSpec with Matchers with EitherValues {
2727

2828
"circe conversions" should {
2929
"decode null as if it was Json.null" in {
30-
circe.implicits.circeDecoderToDecoder[Unit](Decoder.instance { c =>
31-
c.value.asNull.toRight(DecodingFailure("wasn't null!", Nil))
32-
}).apply(null) shouldBe Right(())
30+
circe.implicits
31+
.circeDecoderToDecoder[Unit](Decoder.instance { c =>
32+
c.value.asNull.toRight(DecodingFailure("wasn't null!", Nil))
33+
})
34+
.apply(null) shouldBe Right(())
3335
}
3436

3537
"not report the internal root tag in history when reporting errors" in {
3638

37-
val deco = Decoder.instance(h => {
38-
h.get[String]("hek")(Decoder.failedWithMessage("Bad!"))
39-
})
39+
val deco =
40+
Decoder.instance(h => h.get[String]("hek")(Decoder.failedWithMessage("Bad!")))
4041

4142
val res = circe.implicits.circeDecoderToDecoder[String](deco).apply(new BsonString("hek"))
4243

43-
res.left.value.msg shouldBe "An error occured during decoding BsonValue BsonString{value='hek'}: DecodingFailure(Attempt to decode value on failed cursor, List(DownField(hek)))"
44+
res.left.value.msg shouldBe "An error occured during decoding BsonValue BsonString{value='hek'}: DecodingFailure at .hek: Missing required field"
4445

4546
}
4647
}

core/src/main/scala/mongo4cats/bson/BsonDecoder.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,10 @@ object BsonDocumentDecoder extends LowLevelDocumentDecoder {
5050
}
5151

5252
trait LowLevelDocumentDecoder {
53-
implicit def narrowDecoder[A: BsonDecoder] = BsonDocumentDecoder.instance[A] {
54-
(b: BsonDocument) =>
53+
implicit def narrowDecoder[A: BsonDecoder]: BsonDocumentDecoder[A] =
54+
BsonDocumentDecoder.instance[A] { (b: BsonDocument) =>
5555
BsonDecoder[A].apply(b: BsonValue)
56-
}
56+
}
5757
}
5858

5959
object BsonDecoder {

core/src/main/scala/mongo4cats/helpers.scala

+4-3
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ package mongo4cats
1818

1919
import cats.effect.Async
2020
import fs2.Stream
21-
import fs2.interop.reactivestreams
21+
import fs2.interop.flow
2222
import org.reactivestreams.Publisher
23+
import org.reactivestreams.FlowAdapters
2324

2425
object helpers {
2526

@@ -36,9 +37,9 @@ object helpers {
3637
boundedStream(1).compile.drain
3738

3839
def stream[F[_]: Async]: Stream[F, T] =
39-
reactivestreams.fromPublisher(publisher, DefaultStreamChunkSize)
40+
flow.fromPublisher(FlowAdapters.toFlowPublisher(publisher), DefaultStreamChunkSize)
4041

4142
def boundedStream[F[_]: Async](chunkSize: Int): Stream[F, T] =
42-
reactivestreams.fromPublisher(publisher, chunkSize)
43+
flow.fromPublisher(FlowAdapters.toFlowPublisher(publisher), chunkSize)
4344
}
4445
}

examples/src/main/scala/mongo4cats/examples/CaseClassesWithCirceCodecs.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import mongo4cats.circe.unsafe
2424
import mongo4cats.embedded.EmbeddedMongo
2525

2626
import java.time.Instant
27+
import mongo4cats.bson.BsonDocumentEncoder
2728

2829
object CaseClassesWithCirceCodecs extends IOApp.Simple with EmbeddedMongo {
2930

@@ -35,8 +36,8 @@ object CaseClassesWithCirceCodecs extends IOApp.Simple with EmbeddedMongo {
3536
registrationDate: Instant
3637
)
3738

38-
implicit val addressEnc = unsafe.circeDocumentEncoder[Address]
39-
implicit val personEnc = unsafe.circeDocumentEncoder[Person]
39+
implicit val addressEnc: BsonDocumentEncoder[Address] = unsafe.circeDocumentEncoder[Address]
40+
implicit val personEnc: BsonDocumentEncoder[Person] = unsafe.circeDocumentEncoder[Person]
4041

4142
override val run: IO[Unit] =
4243
withRunningEmbeddedMongo("localhost", 27017) {

examples/src/main/scala/mongo4cats/examples/DistinctNestedClassesWithCirceCodecs.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import mongo4cats.client.MongoClient
2424
import mongo4cats.embedded.EmbeddedMongo
2525

2626
import java.time.Instant
27+
import mongo4cats.bson.BsonDocumentEncoder
2728

2829
object DistinctNestedClassesWithCirceCodecs extends IOApp.Simple with EmbeddedMongo {
2930

@@ -35,8 +36,8 @@ object DistinctNestedClassesWithCirceCodecs extends IOApp.Simple with EmbeddedMo
3536
registrationDate: Instant
3637
)
3738

38-
implicit val addressEnc = unsafe.circeDocumentEncoder[Address]
39-
implicit val personEnc = unsafe.circeDocumentEncoder[Person]
39+
implicit val addressEnc: BsonDocumentEncoder[Address] = unsafe.circeDocumentEncoder[Address]
40+
implicit val personEnc: BsonDocumentEncoder[Person] = unsafe.circeDocumentEncoder[Person]
4041

4142
override val run: IO[Unit] =
4243
withRunningEmbeddedMongo("localhost", 27017) {

project/Dependencies.scala

+4-7
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,16 @@ import sbt._
22

33
object Dependencies {
44
private object Versions {
5-
val mongodb = "4.4.0"
6-
val fs2 = "3.2.4"
5+
val mongodb = "4.11.1"
6+
val fs2 = "3.10.2"
77
val scalaCompat = "2.6.0"
8-
val circe = "0.14.1"
8+
val circe = "0.14.6"
99
val findbugsJsr305Version = "1.3.9"
1010

1111
val logback = "1.2.10"
1212
val scalaTest = "3.2.10"
1313

14-
val testContainers = "0.39.12"
14+
val testContainers = "0.40.10"
1515

1616
val embeddedMongo = "3.2.5"
1717
val immutableValue = "2.8.8"
@@ -25,7 +25,6 @@ object Dependencies {
2525
val findbugsJsr305Version = "com.google.code.findbugs" % "jsr305" % Versions.findbugsJsr305Version % Provided
2626

2727
val fs2Core = "co.fs2" %% "fs2-core" % Versions.fs2
28-
val fs2RS = "co.fs2" %% "fs2-reactive-streams" % Versions.fs2
2928
val scalaCompat = "org.scala-lang.modules" %% "scala-collection-compat" % Versions.scalaCompat
3029

3130
val circeCore = "io.circe" %% "circe-core" % Versions.circe
@@ -49,7 +48,6 @@ object Dependencies {
4948
Libraries.mongodbDriverStreams,
5049
Libraries.findbugsJsr305Version,
5150
Libraries.fs2Core,
52-
Libraries.fs2RS,
5351
Libraries.scalaCompat
5452
)
5553

@@ -70,7 +68,6 @@ object Dependencies {
7068

7169
lazy val embedded = Seq(
7270
Libraries.fs2Core,
73-
Libraries.fs2RS,
7471
Libraries.embeddedMongo,
7572
Libraries.immutableValue,
7673
Libraries.commonsCompress

version.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version in ThisBuild := "0.7.1"
1+
version in ThisBuild := "0.7.2"

0 commit comments

Comments
 (0)