Skip to content

Commit

Permalink
Fix coordinator bug
Browse files Browse the repository at this point in the history
  • Loading branch information
zhxnlai committed Jan 17, 2019
1 parent abf165a commit bcf69f4
Show file tree
Hide file tree
Showing 9 changed files with 288 additions and 42 deletions.
6 changes: 5 additions & 1 deletion scarlet/src/main/java/com/tinder/scarlet/Scarlet.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.tinder.scarlet.internal.utils.RuntimePlatform
import com.tinder.scarlet.internal.utils.StreamAdapterResolver
import com.tinder.scarlet.lifecycle.DefaultLifecycle
import com.tinder.scarlet.lifecycle.FlowableLifecycle
import com.tinder.scarlet.lifecycle.LifecycleRegistry
import com.tinder.scarlet.messageadapter.builtin.BuiltInMessageAdapterFactory
import com.tinder.scarlet.retry.BackoffStrategy
import com.tinder.scarlet.retry.ExponentialBackoffStrategy
Expand Down Expand Up @@ -55,6 +56,7 @@ class Scarlet private constructor(
StateMachineFactory(),
session,
LifecycleEventSource(
getScheduler(configuration.debug),
parentScope() ?: configuration.lifecycle
),
TimerEventSource(
Expand Down Expand Up @@ -109,8 +111,10 @@ class Scarlet private constructor(
else -> Flowable.empty()
}
}
val lifecycleRegistry = LifecycleRegistry()
parentConnectionOpenFlowable.subscribe(lifecycleRegistry)
return configuration.lifecycle
.combineWith(FlowableLifecycle(parentConnectionOpenFlowable))
.combineWith(lifecycleRegistry)
}

data class Configuration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import com.tinder.scarlet.Event
import com.tinder.scarlet.SideEffect
import com.tinder.scarlet.State
import com.tinder.scarlet.StateTransition
import com.tinder.scarlet.utils.toStream
import com.tinder.scarlet.internal.stub.StubInterface
import com.tinder.scarlet.internal.stub.StubMethod
import com.tinder.scarlet.utils.toStream
import io.reactivex.Flowable
import io.reactivex.Scheduler
import io.reactivex.processors.PublishProcessor
Expand All @@ -28,8 +28,8 @@ internal class Coordinator(
private val publishProcessor = PublishProcessor.create<StateTransition>()

fun start() {
lifecycleEventSource.start(this)
session.start(this)
lifecycleEventSource.start(this)
}

@Synchronized
Expand All @@ -52,6 +52,29 @@ internal class Coordinator(
@Synchronized
override fun onEvent(event: Event) {
val transition = stateMachine.transition(event) as? StateMachine.Transition.Valid ?: return

when (transition.toState) {
is State.WillConnect -> {
lifecycleEventSource.resume()
}
is State.Connecting -> {
lifecycleEventSource.pause()
}
is State.Connected -> {
lifecycleEventSource.resume()
}
is State.Disconnecting -> {
lifecycleEventSource.pause()
}
is State.Disconnected -> {
lifecycleEventSource.resume()
}
is State.Destroyed -> {
session.stop()
lifecycleEventSource.stop()
}
}

with(transition.sideEffect) {
when (this) {
is SideEffect.OpenProtocol -> {
Expand Down Expand Up @@ -82,17 +105,7 @@ internal class Coordinator(
)

when (transition.toState) {
is State.WillConnect -> {
lifecycleEventSource.requestNext()
}
is State.Connected -> {
lifecycleEventSource.requestNext()
}
is State.Disconnected -> {
lifecycleEventSource.requestNext()
}
is State.Destroyed -> {
session.stop()
publishProcessor.onComplete()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,48 +7,79 @@ package com.tinder.scarlet.internal.coordinator
import com.tinder.scarlet.Event
import com.tinder.scarlet.Lifecycle
import com.tinder.scarlet.LifecycleState
import io.reactivex.Flowable
import io.reactivex.Scheduler
import io.reactivex.subscribers.DisposableSubscriber
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference

internal class LifecycleEventSource(
private val scheduler: Scheduler,
private val lifecycle: Lifecycle
) {

private val lifecycleStateSubscriber = LifecycleStateSubscriber()
private lateinit var eventCallback: EventCallback
private var lifecycleStateSubscriber: LifecycleStateSubscriber? = null
private var eventCallback: EventCallback? = null

fun start(eventCallback: EventCallback) {
this.eventCallback = eventCallback
lifecycle.subscribe(lifecycleStateSubscriber)
lifecycleStateSubscriber = LifecycleStateSubscriber()
Flowable.fromPublisher(lifecycle)
.observeOn(scheduler)
.subscribe(lifecycleStateSubscriber)
resume()
}

fun requestNext() {
lifecycleStateSubscriber.requestNext()
fun resume() {
lifecycleStateSubscriber?.resume()
}

fun pause() {
lifecycleStateSubscriber?.pause()
}

fun stop() {
eventCallback = null
lifecycleStateSubscriber?.dispose()
lifecycleStateSubscriber = null
}

private inner class LifecycleStateSubscriber : DisposableSubscriber<LifecycleState>() {
private val pendingRequestCount = AtomicInteger()

override fun onStart() = request(1)
private val lastUndeliveredLifecycleState = AtomicReference<LifecycleState>()
private val isResumed = AtomicBoolean()

override fun onNext(lifecycleState: LifecycleState) {
val value = pendingRequestCount.decrementAndGet()
if (value < 0) {
pendingRequestCount.set(0)
}
eventCallback.onEvent(Event.OnLifecycleStateChange(lifecycleState))
lastUndeliveredLifecycleState.set(lifecycleState)
flushIfNeeded()
}

override fun onComplete() {
eventCallback.onEvent(Event.OnLifecycleStateChange(LifecycleState.Completed))
lastUndeliveredLifecycleState.set(LifecycleState.Completed)
flushIfNeeded()
}

override fun onError(throwable: Throwable) = throw throwable

fun requestNext() {
if (pendingRequestCount.get() == 0) {
pendingRequestCount.incrementAndGet()
request(1)
fun resume() {
if (isResumed.get()) {
return
}
isResumed.set(true)
flushIfNeeded()
}

fun pause() {
isResumed.set(false)
}

private fun flushIfNeeded() {
if (!isResumed.get()) {
return
}
val lifecycleState = lastUndeliveredLifecycleState.getAndSet(null)
if (lifecycleState != null) {
eventCallback?.onEvent(Event.OnLifecycleStateChange(lifecycleState))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,30 @@ internal class Session(
}

fun openSession() {
val session = channelDefinition ?: return
val openRequest = session.openRequestFactory.create(session.channel)
session.channel.open(openRequest)
val channelDefinition = checkNotNull(channelDefinition)
val openRequest = channelDefinition.openRequestFactory.create(channelDefinition.channel)
channelDefinition.channel.open(openRequest)
}

fun send(message: Message): Boolean {
val session = channelDefinition ?: return false
val messageQueue = session.messageQueue ?: return false
val metaData = session.sendingMessageMetaDataFactory.create(session.channel, message)
val channelDefinition = checkNotNull(channelDefinition)
val messageQueue = channelDefinition.messageQueue ?: return false
val metaData = channelDefinition.sendingMessageMetaDataFactory.create(
channelDefinition.channel,
message
)
return messageQueue.send(message, metaData)
}

fun closeSession() {
val session = channelDefinition ?: return
val closeRequest = session.closeRequestFactory.create(session.channel)
session.channel.close(closeRequest)
val channelDefinition = checkNotNull(channelDefinition)
val closeRequest = channelDefinition.closeRequestFactory.create(channelDefinition.channel)
channelDefinition.channel.close(closeRequest)
}

fun forceCloseSession() {
val session = channelDefinition ?: return
session.channel.forceClose()
val channelDefinition = checkNotNull(channelDefinition)
channelDefinition.channel.forceClose()
}

inner class Listener : Channel.Listener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ internal class TimerEventSource(
fun stop() {
eventSourceCallback = null
subscriber?.dispose()
subscriber = null
}

private inner class RetryTimerSubscriber : DisposableSubscriber<Long>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@

package com.tinder.scarlet.internal.coordinator

// TODO
class CoordinatorTest
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* © 2019 Match Group, LLC.
*/

package com.tinder.scarlet.internal.coordinator

import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.only
import com.nhaarman.mockito_kotlin.then
import com.tinder.scarlet.Event
import com.tinder.scarlet.LifecycleState
import com.tinder.scarlet.lifecycle.LifecycleRegistry
import io.reactivex.schedulers.TestScheduler
import org.junit.Test

class LifecycleEventSourceTest {

private val testScheduler = TestScheduler()
private val lifecycleRegistry = LifecycleRegistry()
private val lifecycleEventSource = LifecycleEventSource(testScheduler, lifecycleRegistry)

private val eventCallback = mock<EventCallback>()

@Test
fun start_givenLifecycleState_shouldNotify() {
// Given
lifecycleRegistry.onNext(LifecycleState.Started)

// When
lifecycleEventSource.start(eventCallback)
testScheduler.triggerActions()

// Then
then(eventCallback).should().onEvent(Event.OnLifecycleStateChange(LifecycleState.Started))
}

@Test
fun start_givenNoLifecycleState_shouldNotNotify() {
// When
lifecycleEventSource.start(eventCallback)
testScheduler.triggerActions()

// Then
then(eventCallback).shouldHaveZeroInteractions()
}

@Test
fun stop_givenLifecycleState_shouldNotNotify() {
// Given
lifecycleRegistry.onNext(LifecycleState.Started)
lifecycleEventSource.start(eventCallback)
testScheduler.triggerActions()

// When
lifecycleEventSource.stop()
lifecycleRegistry.onNext(LifecycleState.Stopped)
testScheduler.triggerActions()

// Then
then(eventCallback).should(only())
.onEvent(Event.OnLifecycleStateChange(LifecycleState.Started))
}

@Test
fun pause_givenLifecycleState_shouldNotNotify() {
// Given
lifecycleRegistry.onNext(LifecycleState.Started)
lifecycleEventSource.start(eventCallback)
testScheduler.triggerActions()

// When
lifecycleEventSource.pause()
lifecycleRegistry.onNext(LifecycleState.Stopped)
testScheduler.triggerActions()

// Then
then(eventCallback).should(only())
.onEvent(Event.OnLifecycleStateChange(LifecycleState.Started))
}

@Test
fun resume_givenLifecycleState_shouldNotify() {
// Given
lifecycleRegistry.onNext(LifecycleState.Started)
lifecycleEventSource.start(eventCallback)
testScheduler.triggerActions()

lifecycleEventSource.pause()
lifecycleRegistry.onNext(LifecycleState.Stopped)

// When
lifecycleEventSource.resume()
testScheduler.triggerActions()

// Then
then(eventCallback).should().onEvent(Event.OnLifecycleStateChange(LifecycleState.Started))
then(eventCallback).should().onEvent(Event.OnLifecycleStateChange(LifecycleState.Stopped))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*
* © 2019 Match Group, LLC.
*/

package com.tinder.scarlet.internal.coordinator

// TODO
class SessionTest
Loading

0 comments on commit bcf69f4

Please sign in to comment.