Skip to content

Commit

Permalink
improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
cornerman committed Aug 22, 2023
1 parent 0c981c2 commit 0e4fcfe
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 10 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ println(variable2.now()) // "Foo"
println(rx.now()) // "3 - Foo"
```

Apart from `Rx` which always has an initial value, there is `RxLater` which will eventually have a value (both extend RxState which extends RxSource). It also meant for representing state just without an initial state. It is lazy, distinct and has shared execution just like `Rx`.
Apart from `Rx` which always has an initial value, there is `RxLater` (and `VarLater`) which will eventually have a value (both extend RxState which extends RxSource). It also meant for representing state just without an initial state. It is lazy, distinct and has shared execution just like `Rx`.

```
import colibri.reactive._
Expand Down
2 changes: 1 addition & 1 deletion reactive/src/main/scala/colibri/reactive/Owner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object LiveOwner extends LiveOwnerPlatform {
}

def unsafeLive[A](rx: Rx[A]): A = {
ref.unsafeAdd(() => rx.observable.via(subject).unsafeSubscribe())
ref.unsafeAdd(() => rx.observable.unsafeSubscribe(subject))
rx.nowIfSubscribed()
}
}
Expand Down
21 changes: 13 additions & 8 deletions reactive/src/main/scala/colibri/reactive/Reactive.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ trait RxEvent[+A] extends RxSource[A] with RxSourceSelf[RxEvent, RxEvent, A] {

object RxEvent extends RxPlatform {
private val _empty: RxEvent[Nothing] = observableUnshared(Observable.empty)
def empty[A]: RxEvent[A] = _empty
@inline def empty[A]: RxEvent[A] = _empty

def apply[A](values: A*): RxEvent[A] = iterable(values)
@inline def apply[A](values: A*): RxEvent[A] = iterable(values)

def iterable[A](values: Iterable[A]): RxEvent[A] = observableUnshared(Observable.fromIterable(values))

Expand All @@ -121,8 +121,8 @@ object RxEvent extends RxPlatform {
def switch[A](rxs: RxEvent[A]*): RxEvent[A] = observableUnshared(Observable.switchIterable(rxs.map(_.observable)))
def concat[A](rxs: RxEvent[A]*): RxEvent[A] = observableUnshared(Observable.concatIterable(rxs.map(_.observable)))

def observable[A](observable: Observable[A]): RxEvent[A] = observableUnshared(observable.publish.refCount)
private def observableUnshared[A](observable: Observable[A]): RxEvent[A] = new RxEventObservable(observable)
def observable[A](observable: Observable[A]): RxEvent[A] = new RxEventObservableShared(observable)
private def observableUnshared[A](observable: Observable[A]): RxEvent[A] = new RxEventObservableUnshared(observable)

@inline implicit final class RxEventOps[A](private val self: RxEvent[A]) extends AnyVal {
def toRxLater: RxLater[A] = RxLater.observable(self.observable)
Expand All @@ -144,14 +144,14 @@ trait RxLater[+A] extends RxState[A] with RxSourceSelf[RxLater, RxLater, A] {
}

object RxLater {
def empty[A]: RxLater[A] = RxLaterEmpty
@inline def empty[A]: RxLater[A] = RxLaterEmpty

def future[A](future: => Future[A]): RxLater[A] = observable(Observable.fromFuture(future))
def effect[F[_]: RunEffect, A](effect: F[A]): RxLater[A] = observable(Observable.fromEffect(effect))

def observable[A](observable: Observable[A]): RxLater[A] = new RxLaterObservable(observable)

def wrap[A](rx: Rx[A]): RxLater[A] = new RxLaterWrap(rx)
def rx[A](rx: Rx[A]): RxLater[A] = new RxLaterWrap(rx)

@inline implicit final class RxLaterOps[A](private val self: RxLater[A]) extends AnyVal {
def toRxEvent: RxEvent[A] = RxEvent.observable(self.observable)
Expand All @@ -166,6 +166,7 @@ trait Rx[+A] extends RxState[A] with RxSourceSelf[RxLater, Rx, A] {

def apply()(implicit owner: LiveOwner): A
def now()(implicit owner: NowOwner): A

def nowIfSubscribedOption(): Option[A]

final def nowIfSubscribed(): A = nowIfSubscribedOption().getOrElse(throw RxMissingNowException)
Expand Down Expand Up @@ -207,7 +208,7 @@ object Rx extends RxPlatform {
@inline implicit final class RxLaterOps[A](private val self: Rx[A]) extends AnyVal {
def toRxEvent: RxEvent[A] = RxEvent.observable(self.observable)

def toRxLater: RxLater[A] = RxLater.wrap(self)
def toRxLater: RxLater[A] = RxLater.rx(self)
}
}

Expand Down Expand Up @@ -381,7 +382,11 @@ object Var {

// RxEvent

private final class RxEventObservable[A](val observable: Observable[A]) extends RxEvent[A]
private final class RxEventObservableShared[A](inner: Observable[A]) extends RxEvent[A] {
val observable: Observable[A] = inner.publish.refCount
}

private final class RxEventObservableUnshared[A](val observable: Observable[A]) extends RxEvent[A]

// Rx

Expand Down

0 comments on commit 0e4fcfe

Please sign in to comment.