Skip to content

Commit

Permalink
#793 First shot at Stream interop
Browse files Browse the repository at this point in the history
  • Loading branch information
timo-schmid committed Apr 27, 2019
1 parent 04ec293 commit a35e13b
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 1 deletion.
6 changes: 5 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ lazy val interopCats = crossProject(JSPlatform, JVMPlatform)
"co.fs2" %%% "fs2-core" % "1.0.4" % Test
)
)
.dependsOn(core % "test->test;compile->compile")
.dependsOn(
core % "test->test;compile->compile",
// TODO Shoud we create a separate module instead of depending here?
streams % "test->test;compile->compile"
)

val CatsScalaCheckVersion = Def.setting {
CrossVersion.partialVersion(scalaVersion.value) match {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package scalaz.zio.interop.stream

trait Sink[F[_], +A0, -A, +B] {

}
141 changes: 141 additions & 0 deletions interop-cats/jvm/src/main/scala/scalaz/zio/interop/stream/Stream.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package scalaz.zio.interop.stream

import cats.~>
import cats.effect.Effect
import cats.implicits._
import scalaz.zio.{Chunk, Queue, ZIO, ZManaged, ZSchedule}
import scalaz.zio.stream.{Take, ZSink, ZStream}

final class Stream[F[+ _], +A] private[stream] (private[stream] val underlying: ZStream[Any, Throwable, A])(
implicit liftIO: ZIO[Any, Throwable, ?] ~> F,
liftZIO: F ~> ZIO[Any, Throwable, ?],
effect: Effect[F]
) { self =>

import Stream.Fold

def fold[A1 >: A, S]: Fold[F, A1, S] = ???

def foldLeft[A1 >: A, S](s: S)(f: (S, A1) => F[S]): F[S] = {
(fold[A, S]: F[(S, S => Boolean, (S, A) => F[S]) => F[S]]).flatMap(_(s, _ => false, f))
}

def ++[A1 >: A](that: => Stream[F, A1]): Stream[F, A1] =
new Stream(underlying ++ that.underlying)

def drain: Stream[F, Nothing] =
new Stream(underlying.drain)

def filter(pred: A => Boolean): Stream[F, A] =
new Stream(underlying.filter(pred))

def filterM(pred: A => F[Boolean]): Stream[F, A] =
new Stream(underlying.filterM(pred.andThen(liftZIO.apply)))

def filterNot(pred: A => Boolean): Stream[F, A] =
filter(a => !pred(a))

def foreachWhile(f: A => F[Boolean]): F[Unit] =
liftIO.apply(underlying.foreachWhile(f.andThen(liftZIO.apply)))

def collect[B](pf: PartialFunction[A, B]): Stream[F, B] =
new Stream(underlying.collect(pf))

def drop(n: Int): Stream[F, A] =
new Stream(underlying.drop(n))

def dropWhile(pred: A => Boolean): Stream[F, A] =
new Stream(underlying.dropWhile(pred))

def flatMap[B](f: A => Stream[F, B]): Stream[F, B] =
new Stream(underlying.flatMap(f.andThen(_.underlying)))

def foreach(f: A => F[Unit]): F[Unit] =
liftIO(underlying.foreach(f.andThen(liftZIO.apply)))

def forever: Stream[F, A] =
new Stream(underlying.forever)

def map[B](f: A => B): Stream[F, B] =
new Stream(underlying.map(f))

// TODO Should we expose Chunk to clients?
def mapConcat[B](f: A => Chunk[B]): Stream[F, B] =
new Stream(underlying.mapConcat(f))

def mapM[B](f: A => F[B]): Stream[F, B] =
new Stream(underlying.mapM(f.andThen(liftZIO.apply)))

def merge[A1 >: A](that: Stream[F, A1], capacity: Int = 1): Stream[F, A1] =
new Stream(underlying.merge(that.underlying, capacity))

def mergeEither[B](that: Stream[F, B], capacity: Int = 1): Stream[F, Either[A, B]] =
new Stream(underlying.mergeEither(that.underlying, capacity))

def mergeWith[B, C](that: Stream[F, B], capacity: Int = 1)(l: A => C, r: B => C): Stream[F, C] =
new Stream(underlying.mergeWith(that.underlying, capacity)(l, r))

// TODO Sink, Managed
def peel[A1 >: A, B](sink: ZSink[Any, Throwable, A1, A1, B]): ZManaged[Any, Throwable, (B, Stream[F, A1])] = ???

// TODO Schedule #790
// Find out how to deal correctly with "with Clock"
def repeat(schedule: ZSchedule[Any, Unit, _]): Stream[F, A] = ???
// new Stream(underlying.repeat(schedule))

// TODO Schedule #790
// Find out how to deal correctly with "with Clock"
def repeatElems[B](schedule: ZSchedule[Any, A, B]): Stream[F, A] = ???
// new Stream(underlying.repeatElems(schedule))

// TODO Sink
def run[A0, A1 >: A, B](sink: ZSink[Any, Throwable, A0, A1, B]): F[B] =
liftIO(underlying.run(sink))

def mapAccum[S1, B](s1: S1)(f1: (S1, A) => (S1, B)): Stream[F, B] =
new Stream(underlying.mapAccum(s1)(f1))

def mapAccumM[S1, B](s1: S1)(f1: (S1, A) => F[(S1, B)]): Stream[F, B] =
new Stream(underlying.mapAccumM(s1) { (s1, a) => liftZIO(f1(s1, a)) })

def take(n: Int): Stream[F, A] =
new Stream(underlying.take(n))

def takeWhile(pred: A => Boolean): Stream[F, A] =
new Stream(underlying.takeWhile(pred))

def collectWhile[B](pred: PartialFunction[A, B]): Stream[F, B] =
new Stream(underlying.collectWhile(pred))

// TODO ZManaged -> Managed
def toQueue[A1 >: A](capacity: Int = 1): ZManaged[Any, Nothing, Queue[Take[Throwable, A1]]] = ???

// TODO ZSink -> Sink
def transduce[A1 >: A, C](sink: ZSink[Any, Throwable, A1, A1, C]): Stream[F, C] =
new Stream(underlying.transduce(sink))

def tap(f: A => F[_]): Stream[F, A] =
new Stream(underlying.tap { a =>
liftZIO(f(a))
})

def zip[B](that: Stream[F, B], lc: Int = 1, rc: Int = 1): Stream[F, (A, B)] =
new Stream(underlying.zip(that.underlying, lc, rc))

def zipWith[B, C](that: Stream[F, B], lc: Int = 1, rc: Int = 1)(f: (Option[A], Option[B]) => Option[C]): Stream[F, C] =
new Stream(underlying.zipWith(that.underlying, lc, rc)(f))

def zipWithIndex: Stream[F, (A, Int)] =
new Stream(underlying.zipWithIndex)

// TODO
// def mapK[G[+ _]](f: F ~> G): Stream[G, A] =
// new Stream(underlying)(f compose liftIO, effect)

}

object Stream {

type Fold[F[+ _], +A, S] = F[(S, S => Boolean, (S, A) => F[S]) => F[S]]

}

0 comments on commit a35e13b

Please sign in to comment.