Skip to content

Commit 7f2394f

Browse files
authored
Merge pull request #11 from precog/more-resilient-watch-streams
More resilient watch streams
2 parents d73e64e + 142d2e3 commit 7f2394f

File tree

2 files changed

+14
-1
lines changed

2 files changed

+14
-1
lines changed

core/src/main/scala/mongo4cats/collection/queries/WatchQueryBuilder.scala

+13
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import com.mongodb.reactivestreams.client.{
3232
import fs2.Stream
3333
import mongo4cats.helpers._
3434
import mongo4cats.bson.BsonDecoder
35+
import mongo4cats.bson.BsonDecodeError
3536
import mongo4cats.bson.syntax._
3637
import mongo4cats.client.ClientSession
3738
import mongo4cats.collection.operations.Aggregate
@@ -59,6 +60,7 @@ trait WatchQueryBuilder[F[_]] {
5960

6061
//
6162
def stream: Stream[F, ChangeStreamDocument[BsonValue]]
63+
def updateStreamAttempt[A: BsonDecoder](onError: BsonDecodeError => F[Unit]): Stream[F, A]
6264
def updateStream[A: BsonDecoder]: Stream[F, A]
6365

6466
def mapK[G[_]](f: F ~> G): WatchQueryBuilder[G]
@@ -120,6 +122,17 @@ object WatchQueryBuilder {
120122
def stream =
121123
boundedStreamF(1).translate(transform)
122124

125+
def updateStreamAttempt[A: BsonDecoder](onError: BsonDecodeError => G[Unit]) =
126+
boundedStreamF(1)
127+
.map(_.getFullDocument.as[A])
128+
.translate(transform)
129+
.flatMap {
130+
case Left(decodeError) =>
131+
Stream.exec(onError(decodeError))
132+
case Right(elem) =>
133+
Stream(elem)
134+
}
135+
123136
def updateStream[A: BsonDecoder] =
124137
boundedStreamF(1).map(_.getFullDocument).evalMap(_.as[A].liftTo[F]).translate(transform)
125138

version.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version in ThisBuild := "0.6.0"
1+
version in ThisBuild := "0.7.0"

0 commit comments

Comments
 (0)