Skip to content

Commit

Permalink
Add Input caching to Rx implementations and remove NodeLifecycleAware…
Browse files Browse the repository at this point in the history
… from Connectables
  • Loading branch information
mapm14 committed Feb 5, 2024
1 parent 1150d68 commit c85a3c5
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 85 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package com.bumble.appyx.utils.interop.rx2.connectable

import com.bumble.appyx.navigation.plugin.NodeLifecycleAware
import com.jakewharton.rxrelay2.Relay

interface Connectable<Input, Output> : NodeLifecycleAware {
interface Connectable<Input, Output> {
val input: Relay<Input>
val output: Relay<Output>
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,61 +2,106 @@ package com.bumble.appyx.utils.interop.rx2.connectable

import android.annotation.SuppressLint
import com.bumble.appyx.navigation.lifecycle.Lifecycle
import com.bumble.appyx.navigation.plugin.NodeLifecycleAware
import com.jakewharton.rxrelay2.PublishRelay
import com.jakewharton.rxrelay2.Relay
import io.reactivex.Observer

class NodeConnector<Input: Any, Output: Any>(
override val input: Relay<Input> = PublishRelay.create(),
) : Connectable<Input, Output> {
class NodeConnector<Input : Any, Output : Any> : Connectable<Input, Output>, NodeLifecycleAware {

override fun onCreate(lifecycle: Lifecycle) {
flushInputCache()
flushOutputCache()
}

private val intakeInput: Relay<Input> = PublishRelay.create()
private val exhaustInput: Relay<Input> = PublishRelay.create()
private var isInputFlushed = false
private val inputCache = mutableListOf<Input>()

override val input: Relay<Input> = object : Relay<Input>() {

override fun subscribeActual(observer: Observer<in Input>) {
exhaustInput.subscribe(observer)
}

override fun accept(value: Input) {
intakeInput.accept(value)
}

override fun hasObservers() = exhaustInput.hasObservers()

}

private val inputCacheSubscription = intakeInput.subscribe {
synchronized(this) {
if (!isInputFlushed) {
inputCache.add(it)
} else {
exhaustInput.accept(it)
switchToInputExhaust()
}
}
}

private fun flushInputCache() {
synchronized(this) {
if (isInputFlushed) error("Input already flushed")
isInputFlushed = true
inputCache.forEach { exhaustInput.accept(it) }
inputCache.clear()
}
}

private val intake: Relay<Output> = PublishRelay.create()
private val exhaust: Relay<Output> = PublishRelay.create()
private var isFlushed = false
@SuppressLint("CheckResult")
private fun switchToInputExhaust() {
intakeInput.subscribe { exhaustInput.accept(it) }
inputCacheSubscription.dispose()
}


private val intakeOutput: Relay<Output> = PublishRelay.create()
private val exhaustOutput: Relay<Output> = PublishRelay.create()
private var isOutputFlushed = false
private val outputCache = mutableListOf<Output>()

override val output: Relay<Output> = object : Relay<Output>() {

override fun subscribeActual(observer: Observer<in Output>?) {
exhaust.subscribe(observer as Observer<Output>)
override fun subscribeActual(observer: Observer<in Output>) {
exhaustOutput.subscribe(observer)
}

override fun accept(value: Output) {
intake.accept(value)
intakeOutput.accept(value)
}

override fun hasObservers() = exhaust.hasObservers()
override fun hasObservers() = exhaustOutput.hasObservers()

}

override fun onCreate(lifecycle: Lifecycle) {
flushOutputCache()
}

private val cacheSubscription = intake.subscribe {
private val outputCacheSubscription = intakeOutput.subscribe {
synchronized(this) {
if (!isFlushed) {
if (!isOutputFlushed) {
outputCache.add(it)
} else {
exhaust.accept(it)
switchToExhaust()
exhaustOutput.accept(it)
switchToOutputExhaust()
}
}
}

private fun flushOutputCache() {
synchronized(this) {
if (isFlushed) error("Already flushed")
isFlushed = true
outputCache.forEach { exhaust.accept(it) }
if (isOutputFlushed) error("Output already flushed")
isOutputFlushed = true
outputCache.forEach { exhaustOutput.accept(it) }
outputCache.clear()
}
}

@SuppressLint("CheckResult")
private fun switchToExhaust() {
intake.subscribe { exhaust.accept(it) }
cacheSubscription.dispose()
private fun switchToOutputExhaust() {
intakeOutput.subscribe { exhaustOutput.accept(it) }
outputCacheSubscription.dispose()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ class Rx2NodeConnectorTest {
}

sealed class Output {
object Output1 : Output()
object Output2 : Output()
object Output3 : Output()
data object Output1 : Output()
data object Output2 : Output()
data object Output3 : Output()
}

@AfterEach
Expand All @@ -57,7 +57,7 @@ class Rx2NodeConnectorTest {
}

@Test
fun `GIVEN nodeConnector onAttached is not called WHEN output is accepted THEN accepted output do not reach observer`() {
fun `GIVEN nodeConnector onCreate is not called WHEN output is accepted THEN accepted output do not reach observer`() {
val nodeConnector = NodeConnector<Nothing, Output>()
nodeConnector.output.subscribe(firstTestObserver)

Expand All @@ -67,7 +67,7 @@ class Rx2NodeConnectorTest {
}

@Test
fun `GIVEN an output is accepted before onAttached WHEN nodeConnector onAttached is called THEN accepted output reach the observer`() {
fun `GIVEN an output is accepted before onCreate WHEN nodeConnector onCreate is called THEN accepted output reach the observer`() {
val nodeConnector = NodeConnector<Nothing, Output>()
nodeConnector.output.subscribe(firstTestObserver)

Expand All @@ -78,7 +78,7 @@ class Rx2NodeConnectorTest {
}

@Test
fun `GIVEN nodeConnector is attached WHEN output is accepted THEN every accepted output reach the observer`() {
fun `GIVEN nodeConnector is created WHEN output is accepted THEN every accepted output reach the observer`() {
val nodeConnector = NodeConnector<Nothing, Output>()
nodeConnector.output.subscribe(firstTestObserver)

Expand All @@ -89,7 +89,7 @@ class Rx2NodeConnectorTest {
}

@Test
fun `GIVEN outputs accepted before and after onAttached WHEN node is attached THEN every accepted output reach the observer`() {
fun `GIVEN outputs accepted before and after onCreate WHEN node is created THEN every accepted output reach the observer`() {
val nodeConnector = NodeConnector<Nothing, Output>()
nodeConnector.output.subscribe(firstTestObserver)

Expand All @@ -102,7 +102,7 @@ class Rx2NodeConnectorTest {
}

@Test
fun `WHEN nodeConnector onAttached is called twice THEN error is raised`() {
fun `WHEN nodeConnector onCreate is called twice THEN error is raised`() {
val nodeConnector = NodeConnector<Nothing, Output>()

nodeConnector.onCreate(lifecycle)
Expand All @@ -112,7 +112,7 @@ class Rx2NodeConnectorTest {
}

@Test
fun `GIVEN multiple observers and output is accepted before OnAttached WHEN nodeConnector onAttached is called THEN every accepted output reach the observers`() {
fun `GIVEN multiple observers and output is accepted before onCreate WHEN nodeConnector onCreate is called THEN every accepted output reach the observers`() {
val nodeConnector = NodeConnector<Nothing, Output>()
nodeConnector.output.subscribe(firstTestObserver)
nodeConnector.output.subscribe(secondTestObserver)
Expand All @@ -125,7 +125,7 @@ class Rx2NodeConnectorTest {
}

@Test
fun `GIVEN multiple observers and nodeConnector is attached WHEN output is accepted THEN every accepted output reach the observer`() {
fun `GIVEN multiple observers and nodeConnector is created WHEN output is accepted THEN every accepted output reach the observer`() {
val nodeConnector = NodeConnector<Nothing, Output>()
nodeConnector.output.subscribe(firstTestObserver)
nodeConnector.output.subscribe(secondTestObserver)
Expand All @@ -138,19 +138,19 @@ class Rx2NodeConnectorTest {
}

@Test
fun `GIVEN multiple observers that subscribe before and after onAttached and outputs accepted before and after onAttached WHEN node is attached THEN every accepted output reach the observer`() {
fun `GIVEN multiple observers that subscribe before and after onCreate and outputs accepted before and after onCreate WHEN node is created THEN every accepted output reach the observer`() {
val nodeConnector = NodeConnector<Nothing, Output>()
//First subscriber subscribe BEFORE onAttached
//First subscriber subscribe BEFORE onCreate
nodeConnector.output.subscribe(firstTestObserver)

//Output accepted BEFORE onAttached
//Output accepted BEFORE onCreate
nodeConnector.output.accept(Output1)
nodeConnector.onCreate(lifecycle)

//Second subscriber subscribe AFTER onAttached
//Second subscriber subscribe AFTER onCreate
nodeConnector.output.subscribe(secondTestObserver)

//Outputs accepted AFTER onAttached
//Outputs accepted AFTER onCreate
nodeConnector.output.accept(Output2)
nodeConnector.output.accept(Output3)

Expand All @@ -161,7 +161,7 @@ class Rx2NodeConnectorTest {


@Test
fun `WHEN multiple output are accepted from multiple threads THEN output is correctly received when onAttached is called`() {
fun `WHEN multiple output are accepted from multiple threads THEN output is correctly received when onCreate is called`() {
val nodeConnector = NodeConnector<Nothing, Output>()
val threadNumber = 100
val iterations = 10000
Expand Down Expand Up @@ -208,7 +208,7 @@ class Rx2NodeConnectorTest {
* % of failure when race condition issue is present.
*/
@RepeatedTest(1000)
fun `WHEN accept and onAttached are called by different thread at the same time THEN output is the expected`() {
fun `WHEN accept and onCreate are called by different thread at the same time THEN output is the expected`() {
val nodeConnector1 = NodeConnector<Nothing, Output>()
val nodeConnector2 = NodeConnector<Nothing, Output>()
val threadNumber = 2
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package com.bumble.appyx.utils.interop.rx3.connectable

import com.bumble.appyx.navigation.plugin.NodeLifecycleAware
import com.jakewharton.rxrelay3.Relay

interface Connectable<Input, Output> : NodeLifecycleAware {
interface Connectable<Input, Output> {
val input: Relay<Input>
val output: Relay<Output>
}
Loading

0 comments on commit c85a3c5

Please sign in to comment.