Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for missing pre-binding events #156

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions binder/src/main/java/com/badoo/binder/AccumulatorSubject.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.badoo.binder

import io.reactivex.ObservableSource
import io.reactivex.Observer
import io.reactivex.functions.Consumer
import io.reactivex.subjects.PublishSubject
import java.util.concurrent.atomic.AtomicBoolean

interface Drainable {
fun drain()
}

class AccumulatorSubject<T>(
private val initialState: T? = null
) : ObservableSource<T>, Consumer<T>, Drainable {

private val items: MutableList<T> = mutableListOf()
private val events: PublishSubject<T> = PublishSubject.create()

private var drained = AtomicBoolean(false)

init {
initialState?.also { items.add(it) }
}

override fun subscribe(observer: Observer<in T>) {
events.subscribe(observer)
if (!drained.get()) {
items.forEach { observer.onNext(it) }
} else {
initialState?.also { observer.onNext(it) }
}
}

override fun accept(value: T?) {
value?.also {
if (!drained.get()) {
items.add(value)
}
events.onNext(value)
}
}

override fun drain() {
if (!drained.get()) {
drained.set(true)
items.clear()
}
}

companion object {
fun <T> create() = AccumulatorSubject<T>()
}
}
42 changes: 26 additions & 16 deletions binder/src/main/java/com/badoo/binder/Binder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import io.reactivex.rxkotlin.plusAssign
class Binder(
private val lifecycle: Lifecycle? = null,
) : Disposable {
private var drained: Boolean = false
private val bindings = mutableListOf<Binding>()
private val disposables = CompositeDisposable()
private val connections = mutableListOf<Pair<Connection<*, *>, Middleware<*, *>?>>()
private val connectionDisposables = CompositeDisposable()
private var isActive = false

Expand Down Expand Up @@ -51,21 +52,27 @@ class Binder(

when {
lifecycle != null -> {
connections += (connection to middleware)
if (isActive) {
subscribeWithLifecycle(connection, middleware)
with(Binding(connection, middleware)) {
bindings.add(this)
if (drained) {
this.drain()
}
accumulate()
if (isActive) {
subscribeWithLifecycle<In>(this)
}
}
}
else -> subscribe(connection, middleware)
}
}

private fun <Out, In> subscribeWithLifecycle(
connection: Connection<Out, In>,
middleware: Middleware<Out, In>?
) {
connectionDisposables += wrap(connection.from)
.subscribeWithMiddleware(connection, middleware)
private fun <In> subscribeWithLifecycle(binding: Binding) {
connectionDisposables += wrap(binding.source)
.subscribeWithMiddleware(
binding.connection as Connection<Any?, In>,
binding.middleware as? Middleware<Any?, In>,
)
}

private fun <Out, In> subscribe(
Expand Down Expand Up @@ -120,12 +127,8 @@ class Binder(

private fun bindConnections() {
isActive = true
connections.forEach { (connection, middleware) ->
subscribeWithLifecycle(
connection as Connection<Any, Any>,
middleware as? Middleware<Any, Any>
)
}
bindings.forEach { it.accumulate() }
bindings.forEach { subscribeWithLifecycle<Any>(it) }
}

private fun unbindConnections() {
Expand Down Expand Up @@ -160,6 +163,13 @@ class Binder(
this
}

fun drain() {
if (!drained) {
bindings.forEach { it.drain() }
drained = true
}
}

class BinderObserveOnScope(
private val binder: Binder,
private val observeScheduler: Scheduler
Expand Down
25 changes: 25 additions & 0 deletions binder/src/main/java/com/badoo/binder/Binding.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.badoo.binder

import com.badoo.binder.middleware.base.Middleware
import io.reactivex.ObservableSource
import io.reactivex.subjects.UnicastSubject

internal class Binding(
val connection: Connection<*, *>,
val middleware: Middleware<*, *>?
) {

var source: ObservableSource<*>? = null
private set

fun accumulate() {
source = connection.from?.let { source ->
UnicastSubject.create<Any>()
.also { observer -> source.subscribe(observer) }
}
}

fun drain() {
(connection.from as? Drainable)?.drain()
}
}
116 changes: 116 additions & 0 deletions binder/src/test/java/com/badoo/binder/AccumulatorSubjectTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package com.badoo.binder

import com.badoo.binder.lifecycle.Lifecycle
import com.badoo.binder.lifecycle.ManualLifecycle
import io.reactivex.functions.Consumer
import io.reactivex.subjects.PublishSubject
import org.junit.jupiter.api.Test

class AccumulatorSubjectTest {

@Test
fun `GIVEN the producer is an accumulator AND the consumer subscribes after the lifecycle started AND before the drain THEN consumes all the events produced`() {
var score = 0
val lifecycle: ManualLifecycle = Lifecycle.manual()
val producer = AccumulatorSubject.create<Int>()
val consumer = PublishSubject.create<Int>()
val testObserver = consumer.test()
val binder = Binder(lifecycle)

producer.accept(score++)
producer.accept(score++)
lifecycle.begin()
binder.bind(producer to Consumer { consumer.onNext(it) })
binder.drain()

testObserver.onComplete()
testObserver.assertValues(0, 1)
}

@Test
fun `GIVEN the producer is an accumulator AND the consumer subscribes after lifecycle started AND before the drain AND producer produces an event after the drain THEN consumes all the produced events`() {
var score = 0
val lifecycle: ManualLifecycle = Lifecycle.manual()
val producer = AccumulatorSubject.create<Int>()
val consumer = PublishSubject.create<Int>()
val testObserver = consumer.test()
val binder = Binder(lifecycle)

producer.accept(score++)
producer.accept(score++)
lifecycle.begin()
binder.bind(producer to Consumer { consumer.onNext(it) })
binder.drain()
producer.accept(score++)

testObserver.onComplete()
testObserver.assertValues(0, 1, 2)
}

@Test
fun `GIVEN the producer is an accumulator AND the consumer subscribes after the lifecycle started AND after the drain THEN the consumer consumes only the events produced after the drain`() {
var score = 0
val lifecycle: ManualLifecycle = Lifecycle.manual()
val producer = AccumulatorSubject.create<Int>()
val consumer = PublishSubject.create<Int>()
val testObserver = consumer.test()
val binder = Binder(lifecycle)

producer.accept(score++)
producer.accept(score++)
lifecycle.begin()
binder.drain()
binder.bind(producer to Consumer { consumer.onNext(it) })
producer.accept(score++)

testObserver.onComplete()
testObserver.assertValues(2)
}

@Test
fun `GIVEN the producer is an accumulator WHEN the consumer subscribes after the lifecycle started AND before the drain THEN should receive events produced before the drain and published when the lifecycle is active`() {
var score = 0
val lifecycle: ManualLifecycle = Lifecycle.manual()
val producer = AccumulatorSubject.create<Int>()
val consumer = PublishSubject.create<Int>()
val testObserver = consumer.test()
val binder = Binder(lifecycle)

producer.accept(score++)
producer.accept(score++)
lifecycle.begin()
binder.bind(producer to Consumer { consumer.onNext(it) })
binder.drain()
lifecycle.end()
producer.accept(score++)
lifecycle.begin()
producer.accept(score++)

testObserver.onComplete()
testObserver.assertValues(0, 1, 3)
}

@Test
fun `GIVEN the producer is an accumulator WHEN the consumer subscribes after the lifecycle restarted THEN should receive only the events after the restart when the lifecycle is active`() {
var score = 0
val lifecycle: ManualLifecycle = Lifecycle.manual()
val producer = AccumulatorSubject.create<Int>()
val consumer = PublishSubject.create<Int>()
val testObserver = consumer.test()
val binder = Binder(lifecycle)

producer.accept(score++)
producer.accept(score++)
lifecycle.begin()
binder.drain()
lifecycle.end()
binder.bind(producer to Consumer { consumer.onNext(it) })
producer.accept(score++)
lifecycle.begin()
producer.accept(score++)

testObserver.onComplete()
testObserver.assertValues(3)
}

}
Loading