Skip to content

Commit

Permalink
scarlet-stream-adapter-coroutines produces hot flow and hot receive c…
Browse files Browse the repository at this point in the history
…hannel

Fixes Tinder#169
Fixes Tinder#163
Fixes Tinder#27
  • Loading branch information
greggiacovelli committed Jan 4, 2022
1 parent 952bdba commit 9433b84
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 93 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.tinder.streamadapter.coroutines

import com.tinder.scarlet.Stream
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.sendBlocking

internal class ChannelForwarder<T> : Stream.Observer<T> {
private val _channel = Channel<T>()
val channel: ReceiveChannel<T> = _channel

fun start(stream: Stream<T>): ReceiveChannel<T> {
stream.start(this)
return channel
}

override fun onComplete() {
_channel.close()
}

override fun onError(throwable: Throwable) {
_channel.close(throwable)
}

override fun onNext(data: T) {
println("$data is ")
_channel.sendBlocking(data)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import java.lang.reflect.Type
class CoroutinesStreamAdapterFactory : StreamAdapter.Factory {

override fun create(type: Type): StreamAdapter<Any, Any> {
println("RAW TYPE = $type")
return when (type.getRawType()) {
Flow::class.java -> FlowStreamAdapter()
ReceiveChannel::class.java -> ReceiveChannelAdapter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ package com.tinder.streamadapter.coroutines
import com.tinder.scarlet.Stream
import com.tinder.scarlet.StreamAdapter
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.receiveAsFlow

class FlowStreamAdapter<T> : StreamAdapter<T, Flow<T>> {

override fun adapt(stream: Stream<T>): Flow<T> {
return (stream as Stream<Any>).asFlow() as Flow<T>
val channelForwarder = ChannelForwarder<T>()
return channelForwarder.start(stream).receiveAsFlow().onEach {
println("message + $it")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,12 @@ package com.tinder.streamadapter.coroutines

import com.tinder.scarlet.Stream
import com.tinder.scarlet.StreamAdapter
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.sendBlocking

class ReceiveChannelAdapter<T> : StreamAdapter<T, ReceiveChannel<T>> {

override fun adapt(stream: Stream<T>): ReceiveChannel<T> {
val channelForwarder = ChannelForwarder<T>()
stream.start(channelForwarder)
return channelForwarder.channel
}

private class ChannelForwarder<T> : Stream.Observer<T> {
private val _channel = Channel<T>()
val channel: ReceiveChannel<T> = _channel

override fun onComplete() {
_channel.close()
}

override fun onError(throwable: Throwable) {
_channel.close(throwable)
}

override fun onNext(data: T) {
println("$data is ")
_channel.sendBlocking(data)
}
return channelForwarder.start(stream)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.tinder.scarlet.streamadapter.coroutines

import com.tinder.scarlet.Lifecycle
import com.tinder.scarlet.Scarlet
import com.tinder.scarlet.WebSocket
import com.tinder.scarlet.lifecycle.LifecycleRegistry
import com.tinder.scarlet.testutils.TestStreamObserver
import com.tinder.scarlet.testutils.ValueAssert
import com.tinder.scarlet.testutils.any
import com.tinder.scarlet.websocket.mockwebserver.newWebSocketFactory
import com.tinder.scarlet.websocket.okhttp.newWebSocketFactory
import com.tinder.streamadapter.coroutines.CoroutinesStreamAdapterFactory
import okhttp3.OkHttpClient
import okhttp3.mockwebserver.MockWebServer
import org.junit.rules.ExternalResource
import java.util.concurrent.TimeUnit

abstract class ClientServerModel<T : Any>(private val clazz: Class<T>) : ExternalResource() {

val mockWebServer = MockWebServer()
private val serverUrlString by lazy { mockWebServer.url("/").toString() }

private val serverLifecycleRegistry = LifecycleRegistry()
private lateinit var server: T
private lateinit var serverEventObserver: TestStreamObserver<WebSocket.Event>

private val clientLifecycleRegistry = LifecycleRegistry()
private lateinit var client: T
private lateinit var clientEventObserver: TestStreamObserver<WebSocket.Event>

internal fun givenConnectionIsEstablished(): Pair<T, T> {
createClientAndServer()
serverLifecycleRegistry.onNext(Lifecycle.State.Started)
clientLifecycleRegistry.onNext(Lifecycle.State.Started)
blockUntilConnectionIsEstablish()
return client to server
}

internal fun createClientAndServer() {
server = createServer()
client = createClient()
clientEventObserver = observeWebSocketEvents(client) // client.observeEvents().test()
serverEventObserver = observeWebSocketEvents(server) // server.observeEvents().test()
}
abstract fun observeWebSocketEvents(service: T): TestStreamObserver<WebSocket.Event>
internal fun createServer(): T {
val webSocketFactory = mockWebServer.newWebSocketFactory()
val scarlet = Scarlet.Builder()
.webSocketFactory(webSocketFactory)
.lifecycle(serverLifecycleRegistry)
.addStreamAdapterFactory(CoroutinesStreamAdapterFactory())
.build()
return scarlet.create(clazz)
}

internal fun createClient(): T {
val okHttpClient = OkHttpClient.Builder()
.writeTimeout(500, TimeUnit.MILLISECONDS)
.readTimeout(500, TimeUnit.MILLISECONDS)
.build()
val webSocketFactory = okHttpClient.newWebSocketFactory(serverUrlString)
val scarlet = Scarlet.Builder()
.webSocketFactory(webSocketFactory)
.lifecycle(clientLifecycleRegistry)
.addStreamAdapterFactory(CoroutinesStreamAdapterFactory())
.build()
return scarlet.create(clazz)
}

internal fun awaitValues(vararg values: ValueAssert<Any>) {
serverEventObserver.awaitValues(*values)
}

internal fun blockUntilConnectionIsEstablish() {
clientEventObserver.awaitValues(
any<WebSocket.Event.OnConnectionOpened<*>>()
)
serverEventObserver.awaitValues(
any<WebSocket.Event.OnConnectionOpened<*>>()
)
}

override fun after() {
mockWebServer.shutdown()
}

override fun before() {
mockWebServer.start()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.tinder.scarlet.streamadapter.coroutines

import com.google.common.truth.Correspondence
import com.google.common.truth.Truth.assertThat
import com.tinder.scarlet.Stream
import com.tinder.scarlet.WebSocket
import com.tinder.scarlet.testutils.TestStreamObserver
import com.tinder.scarlet.testutils.any
import com.tinder.scarlet.testutils.containingBytes
import com.tinder.scarlet.testutils.containingText
import com.tinder.scarlet.testutils.test
import com.tinder.scarlet.ws.Receive
import com.tinder.scarlet.ws.Send
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import org.junit.Rule
import java.util.Arrays

class FlowStreamAdapterTest {

@get:Rule
val clientServerModel = object : ClientServerModel<Service>(Service::class.java) {
override fun observeWebSocketEvents(service: Service): TestStreamObserver<WebSocket.Event> {
return service.observeEvents().test()
}
}

fun `adapt - given a stream of strings, provides a Flow interface bound to the stream`() =
runBlocking {
// Given
val (client, server) = clientServerModel.givenConnectionIsEstablished()
val textMessage1 = "Hello"
val textMessage2 = "Hi!"
val bytesMessage1 = "Yo".toByteArray()
val bytesMessage2 = "Sup".toByteArray()
val testTextChannel = server.observeText()
val testBytesChannel = server.observeBytes()

// When
client.sendText(textMessage1)
val isSendTextSuccessful = client.sendTextAndConfirm(textMessage2)
client.sendBytes(bytesMessage1)
val isSendBytesSuccessful = client.sendBytesAndConfirm(bytesMessage2)

// Then
assertThat(isSendBytesSuccessful).isTrue()
assertThat(isSendTextSuccessful).isTrue()

clientServerModel.awaitValues(
any<WebSocket.Event.OnConnectionOpened<*>>(),
any<WebSocket.Event.OnMessageReceived>().containingText(textMessage1),
any<WebSocket.Event.OnMessageReceived>().containingText(textMessage2),
any<WebSocket.Event.OnMessageReceived>().containingBytes(bytesMessage1),
any<WebSocket.Event.OnMessageReceived>().containingBytes(bytesMessage2)
)

assertThat(testTextChannel.take(2).toList()).containsExactly(
textMessage1,
textMessage2
).inOrder()

val bytes = testBytesChannel.take(2).toList()
assertThat(bytes).comparingElementsUsing(BYTE_ARRAY_CORRESPONDENCE).containsExactly(
bytesMessage1,
bytesMessage2
).inOrder()
}

val BYTE_ARRAY_CORRESPONDENCE = Correspondence.from<ByteArray, ByteArray>({ actual, expected ->
Arrays.compare(actual, expected) == 0
}, "Compare using Arrays.equals")

interface Service {
@Receive
fun observeEvents(): Stream<WebSocket.Event>

@Receive
fun observeText(): Flow<String>

@Receive
fun observeBytes(): Flow<ByteArray>

@Send
fun sendText(message: String)

@Send
fun sendTextAndConfirm(message: String): Boolean

@Send
fun sendBytes(message: ByteArray)

@Send
fun sendBytesAndConfirm(message: ByteArray): Boolean
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,46 +5,32 @@
package com.tinder.scarlet.streamadapter.coroutines

import com.google.common.truth.Truth.assertThat
import com.tinder.scarlet.Lifecycle
import com.tinder.scarlet.Scarlet
import com.tinder.scarlet.Stream
import com.tinder.scarlet.WebSocket
import com.tinder.scarlet.lifecycle.LifecycleRegistry
import com.tinder.scarlet.testutils.TestStreamObserver
import com.tinder.scarlet.testutils.any
import com.tinder.scarlet.testutils.containingBytes
import com.tinder.scarlet.testutils.containingText
import com.tinder.scarlet.testutils.test
import com.tinder.scarlet.websocket.mockwebserver.newWebSocketFactory
import com.tinder.scarlet.websocket.okhttp.newWebSocketFactory
import com.tinder.scarlet.ws.Receive
import com.tinder.scarlet.ws.Send
import com.tinder.streamadapter.coroutines.CoroutinesStreamAdapterFactory
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.runBlocking
import okhttp3.OkHttpClient
import okhttp3.mockwebserver.MockWebServer
import org.junit.Rule
import org.junit.Test
import java.util.concurrent.TimeUnit

class ReceiveChannelTest {

@get:Rule val mockWebServer = MockWebServer()
private val serverUrlString by lazy { mockWebServer.url("/").toString() }

private val serverLifecycleRegistry = LifecycleRegistry()
private lateinit var server: Service
private lateinit var serverEventObserver: TestStreamObserver<WebSocket.Event>

private val clientLifecycleRegistry = LifecycleRegistry()
private lateinit var client: Service
private lateinit var clientEventObserver: TestStreamObserver<WebSocket.Event>
@get:Rule val clientServerModel = object : ClientServerModel<Service>(Service::class.java) {
override fun observeWebSocketEvents(service: Service): TestStreamObserver<WebSocket.Event> {
return service.observeEvents().test()
}
}

@Test
fun send_givenConnectionIsEstablished_shouldBeReceivedByTheServer() {
// Given
givenConnectionIsEstablished()
val (client, server) = clientServerModel.givenConnectionIsEstablished()
val textMessage1 = "Hello"
val textMessage2 = "Hi!"
val bytesMessage1 = "Yo".toByteArray()
Expand All @@ -62,7 +48,7 @@ class ReceiveChannelTest {
assertThat(isSendTextSuccessful).isTrue()
assertThat(isSendBytesSuccessful).isTrue()

serverEventObserver.awaitValues(
clientServerModel.awaitValues(
any<WebSocket.Event.OnConnectionOpened<*>>(),
any<WebSocket.Event.OnMessageReceived>().containingText(textMessage1),
any<WebSocket.Event.OnMessageReceived>().containingText(textMessage2),
Expand All @@ -79,54 +65,7 @@ class ReceiveChannelTest {
}
}

private fun givenConnectionIsEstablished() {
createClientAndServer()
serverLifecycleRegistry.onNext(Lifecycle.State.Started)
clientLifecycleRegistry.onNext(Lifecycle.State.Started)
blockUntilConnectionIsEstablish()
}

private fun createClientAndServer() {
server = createServer()
serverEventObserver = server.observeEvents().test()
client = createClient()
clientEventObserver = client.observeEvents().test()
}

private fun createServer(): Service {
val webSocketFactory = mockWebServer.newWebSocketFactory()
val scarlet = Scarlet.Builder()
.webSocketFactory(webSocketFactory)
.lifecycle(serverLifecycleRegistry)
.addStreamAdapterFactory(CoroutinesStreamAdapterFactory())
.build()
return scarlet.create()
}

private fun createClient(): Service {
val okHttpClient = OkHttpClient.Builder()
.writeTimeout(500, TimeUnit.MILLISECONDS)
.readTimeout(500, TimeUnit.MILLISECONDS)
.build()
val webSocketFactory = okHttpClient.newWebSocketFactory(serverUrlString)
val scarlet = Scarlet.Builder()
.webSocketFactory(webSocketFactory)
.lifecycle(clientLifecycleRegistry)
.addStreamAdapterFactory(CoroutinesStreamAdapterFactory())
.build()
return scarlet.create()
}

private fun blockUntilConnectionIsEstablish() {
clientEventObserver.awaitValues(
any<WebSocket.Event.OnConnectionOpened<*>>()
)
serverEventObserver.awaitValues(
any<WebSocket.Event.OnConnectionOpened<*>>()
)
}

private interface Service {
interface Service {
@Receive
fun observeEvents(): Stream<WebSocket.Event>

Expand Down

0 comments on commit 9433b84

Please sign in to comment.