From 9433b84ae4ed10257a0133100c4f9992af902bcb Mon Sep 17 00:00:00 2001 From: Greg Giacovelli Date: Fri, 17 Dec 2021 18:39:30 -0800 Subject: [PATCH] scarlet-stream-adapter-coroutines produces hot flow and hot receive channel Fixes #169 Fixes #163 Fixes #27 --- .../coroutines/ChannelForwarder.kt | 29 ++++++ .../CoroutinesStreamAdapterFactory.kt | 1 + .../coroutines/FlowStreamAdapter.kt | 8 +- .../coroutines/ReceiveChannelAdapter.kt | 23 +---- .../coroutines/ClientServerModel.kt | 90 +++++++++++++++++ .../coroutines/FlowStreamAdapterTest.kt | 97 +++++++++++++++++++ .../coroutines/ReceiveChannelTest.kt | 77 ++------------- 7 files changed, 232 insertions(+), 93 deletions(-) create mode 100644 scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/ChannelForwarder.kt create mode 100644 scarlet-stream-adapter-coroutines/src/test/java/com/tinder/scarlet/streamadapter/coroutines/ClientServerModel.kt create mode 100644 scarlet-stream-adapter-coroutines/src/test/java/com/tinder/scarlet/streamadapter/coroutines/FlowStreamAdapterTest.kt diff --git a/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/ChannelForwarder.kt b/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/ChannelForwarder.kt new file mode 100644 index 00000000..d1c1f546 --- /dev/null +++ b/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/ChannelForwarder.kt @@ -0,0 +1,29 @@ +package com.tinder.streamadapter.coroutines + +import com.tinder.scarlet.Stream +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.sendBlocking + +internal class ChannelForwarder : Stream.Observer { + private val _channel = Channel() + val channel: ReceiveChannel = _channel + + fun start(stream: Stream): ReceiveChannel { + stream.start(this) + return channel + } + + override fun onComplete() { + _channel.close() + } + + override fun onError(throwable: Throwable) { + _channel.close(throwable) + } + + override fun onNext(data: T) { + println("$data is ") + _channel.sendBlocking(data) + } +} diff --git a/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/CoroutinesStreamAdapterFactory.kt b/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/CoroutinesStreamAdapterFactory.kt index 0bb987d4..c5cdd04c 100644 --- a/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/CoroutinesStreamAdapterFactory.kt +++ b/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/CoroutinesStreamAdapterFactory.kt @@ -16,6 +16,7 @@ import java.lang.reflect.Type class CoroutinesStreamAdapterFactory : StreamAdapter.Factory { override fun create(type: Type): StreamAdapter { + println("RAW TYPE = $type") return when (type.getRawType()) { Flow::class.java -> FlowStreamAdapter() ReceiveChannel::class.java -> ReceiveChannelAdapter() diff --git a/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/FlowStreamAdapter.kt b/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/FlowStreamAdapter.kt index d008b3a9..1b192e8b 100644 --- a/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/FlowStreamAdapter.kt +++ b/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/FlowStreamAdapter.kt @@ -7,11 +7,15 @@ package com.tinder.streamadapter.coroutines import com.tinder.scarlet.Stream import com.tinder.scarlet.StreamAdapter import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.reactive.asFlow +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.receiveAsFlow class FlowStreamAdapter : StreamAdapter> { override fun adapt(stream: Stream): Flow { - return (stream as Stream).asFlow() as Flow + val channelForwarder = ChannelForwarder() + return channelForwarder.start(stream).receiveAsFlow().onEach { + println("message + $it") + } } } diff --git a/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/ReceiveChannelAdapter.kt b/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/ReceiveChannelAdapter.kt index 4b76baf2..38a1a468 100644 --- a/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/ReceiveChannelAdapter.kt +++ b/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/ReceiveChannelAdapter.kt @@ -2,33 +2,12 @@ package com.tinder.streamadapter.coroutines import com.tinder.scarlet.Stream import com.tinder.scarlet.StreamAdapter -import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ReceiveChannel -import kotlinx.coroutines.channels.sendBlocking class ReceiveChannelAdapter : StreamAdapter> { override fun adapt(stream: Stream): ReceiveChannel { val channelForwarder = ChannelForwarder() - stream.start(channelForwarder) - return channelForwarder.channel - } - - private class ChannelForwarder : Stream.Observer { - private val _channel = Channel() - val channel: ReceiveChannel = _channel - - override fun onComplete() { - _channel.close() - } - - override fun onError(throwable: Throwable) { - _channel.close(throwable) - } - - override fun onNext(data: T) { - println("$data is ") - _channel.sendBlocking(data) - } + return channelForwarder.start(stream) } } diff --git a/scarlet-stream-adapter-coroutines/src/test/java/com/tinder/scarlet/streamadapter/coroutines/ClientServerModel.kt b/scarlet-stream-adapter-coroutines/src/test/java/com/tinder/scarlet/streamadapter/coroutines/ClientServerModel.kt new file mode 100644 index 00000000..336f2801 --- /dev/null +++ b/scarlet-stream-adapter-coroutines/src/test/java/com/tinder/scarlet/streamadapter/coroutines/ClientServerModel.kt @@ -0,0 +1,90 @@ +package com.tinder.scarlet.streamadapter.coroutines + +import com.tinder.scarlet.Lifecycle +import com.tinder.scarlet.Scarlet +import com.tinder.scarlet.WebSocket +import com.tinder.scarlet.lifecycle.LifecycleRegistry +import com.tinder.scarlet.testutils.TestStreamObserver +import com.tinder.scarlet.testutils.ValueAssert +import com.tinder.scarlet.testutils.any +import com.tinder.scarlet.websocket.mockwebserver.newWebSocketFactory +import com.tinder.scarlet.websocket.okhttp.newWebSocketFactory +import com.tinder.streamadapter.coroutines.CoroutinesStreamAdapterFactory +import okhttp3.OkHttpClient +import okhttp3.mockwebserver.MockWebServer +import org.junit.rules.ExternalResource +import java.util.concurrent.TimeUnit + +abstract class ClientServerModel(private val clazz: Class) : ExternalResource() { + + val mockWebServer = MockWebServer() + private val serverUrlString by lazy { mockWebServer.url("/").toString() } + + private val serverLifecycleRegistry = LifecycleRegistry() + private lateinit var server: T + private lateinit var serverEventObserver: TestStreamObserver + + private val clientLifecycleRegistry = LifecycleRegistry() + private lateinit var client: T + private lateinit var clientEventObserver: TestStreamObserver + + internal fun givenConnectionIsEstablished(): Pair { + createClientAndServer() + serverLifecycleRegistry.onNext(Lifecycle.State.Started) + clientLifecycleRegistry.onNext(Lifecycle.State.Started) + blockUntilConnectionIsEstablish() + return client to server + } + + internal fun createClientAndServer() { + server = createServer() + client = createClient() + clientEventObserver = observeWebSocketEvents(client) // client.observeEvents().test() + serverEventObserver = observeWebSocketEvents(server) // server.observeEvents().test() + } + abstract fun observeWebSocketEvents(service: T): TestStreamObserver + internal fun createServer(): T { + val webSocketFactory = mockWebServer.newWebSocketFactory() + val scarlet = Scarlet.Builder() + .webSocketFactory(webSocketFactory) + .lifecycle(serverLifecycleRegistry) + .addStreamAdapterFactory(CoroutinesStreamAdapterFactory()) + .build() + return scarlet.create(clazz) + } + + internal fun createClient(): T { + val okHttpClient = OkHttpClient.Builder() + .writeTimeout(500, TimeUnit.MILLISECONDS) + .readTimeout(500, TimeUnit.MILLISECONDS) + .build() + val webSocketFactory = okHttpClient.newWebSocketFactory(serverUrlString) + val scarlet = Scarlet.Builder() + .webSocketFactory(webSocketFactory) + .lifecycle(clientLifecycleRegistry) + .addStreamAdapterFactory(CoroutinesStreamAdapterFactory()) + .build() + return scarlet.create(clazz) + } + + internal fun awaitValues(vararg values: ValueAssert) { + serverEventObserver.awaitValues(*values) + } + + internal fun blockUntilConnectionIsEstablish() { + clientEventObserver.awaitValues( + any>() + ) + serverEventObserver.awaitValues( + any>() + ) + } + + override fun after() { + mockWebServer.shutdown() + } + + override fun before() { + mockWebServer.start() + } +} diff --git a/scarlet-stream-adapter-coroutines/src/test/java/com/tinder/scarlet/streamadapter/coroutines/FlowStreamAdapterTest.kt b/scarlet-stream-adapter-coroutines/src/test/java/com/tinder/scarlet/streamadapter/coroutines/FlowStreamAdapterTest.kt new file mode 100644 index 00000000..28f62b11 --- /dev/null +++ b/scarlet-stream-adapter-coroutines/src/test/java/com/tinder/scarlet/streamadapter/coroutines/FlowStreamAdapterTest.kt @@ -0,0 +1,97 @@ +package com.tinder.scarlet.streamadapter.coroutines + +import com.google.common.truth.Correspondence +import com.google.common.truth.Truth.assertThat +import com.tinder.scarlet.Stream +import com.tinder.scarlet.WebSocket +import com.tinder.scarlet.testutils.TestStreamObserver +import com.tinder.scarlet.testutils.any +import com.tinder.scarlet.testutils.containingBytes +import com.tinder.scarlet.testutils.containingText +import com.tinder.scarlet.testutils.test +import com.tinder.scarlet.ws.Receive +import com.tinder.scarlet.ws.Send +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.runBlocking +import org.junit.Rule +import java.util.Arrays + +class FlowStreamAdapterTest { + + @get:Rule + val clientServerModel = object : ClientServerModel(Service::class.java) { + override fun observeWebSocketEvents(service: Service): TestStreamObserver { + return service.observeEvents().test() + } + } + + fun `adapt - given a stream of strings, provides a Flow interface bound to the stream`() = + runBlocking { + // Given + val (client, server) = clientServerModel.givenConnectionIsEstablished() + val textMessage1 = "Hello" + val textMessage2 = "Hi!" + val bytesMessage1 = "Yo".toByteArray() + val bytesMessage2 = "Sup".toByteArray() + val testTextChannel = server.observeText() + val testBytesChannel = server.observeBytes() + + // When + client.sendText(textMessage1) + val isSendTextSuccessful = client.sendTextAndConfirm(textMessage2) + client.sendBytes(bytesMessage1) + val isSendBytesSuccessful = client.sendBytesAndConfirm(bytesMessage2) + + // Then + assertThat(isSendBytesSuccessful).isTrue() + assertThat(isSendTextSuccessful).isTrue() + + clientServerModel.awaitValues( + any>(), + any().containingText(textMessage1), + any().containingText(textMessage2), + any().containingBytes(bytesMessage1), + any().containingBytes(bytesMessage2) + ) + + assertThat(testTextChannel.take(2).toList()).containsExactly( + textMessage1, + textMessage2 + ).inOrder() + + val bytes = testBytesChannel.take(2).toList() + assertThat(bytes).comparingElementsUsing(BYTE_ARRAY_CORRESPONDENCE).containsExactly( + bytesMessage1, + bytesMessage2 + ).inOrder() + } + + val BYTE_ARRAY_CORRESPONDENCE = Correspondence.from({ actual, expected -> + Arrays.compare(actual, expected) == 0 + }, "Compare using Arrays.equals") + + interface Service { + @Receive + fun observeEvents(): Stream + + @Receive + fun observeText(): Flow + + @Receive + fun observeBytes(): Flow + + @Send + fun sendText(message: String) + + @Send + fun sendTextAndConfirm(message: String): Boolean + + @Send + fun sendBytes(message: ByteArray) + + @Send + fun sendBytesAndConfirm(message: ByteArray): Boolean + } +} diff --git a/scarlet-stream-adapter-coroutines/src/test/java/com/tinder/scarlet/streamadapter/coroutines/ReceiveChannelTest.kt b/scarlet-stream-adapter-coroutines/src/test/java/com/tinder/scarlet/streamadapter/coroutines/ReceiveChannelTest.kt index bcca4cdc..90e633d2 100644 --- a/scarlet-stream-adapter-coroutines/src/test/java/com/tinder/scarlet/streamadapter/coroutines/ReceiveChannelTest.kt +++ b/scarlet-stream-adapter-coroutines/src/test/java/com/tinder/scarlet/streamadapter/coroutines/ReceiveChannelTest.kt @@ -5,46 +5,32 @@ package com.tinder.scarlet.streamadapter.coroutines import com.google.common.truth.Truth.assertThat -import com.tinder.scarlet.Lifecycle -import com.tinder.scarlet.Scarlet import com.tinder.scarlet.Stream import com.tinder.scarlet.WebSocket -import com.tinder.scarlet.lifecycle.LifecycleRegistry import com.tinder.scarlet.testutils.TestStreamObserver import com.tinder.scarlet.testutils.any import com.tinder.scarlet.testutils.containingBytes import com.tinder.scarlet.testutils.containingText import com.tinder.scarlet.testutils.test -import com.tinder.scarlet.websocket.mockwebserver.newWebSocketFactory -import com.tinder.scarlet.websocket.okhttp.newWebSocketFactory import com.tinder.scarlet.ws.Receive import com.tinder.scarlet.ws.Send -import com.tinder.streamadapter.coroutines.CoroutinesStreamAdapterFactory import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.runBlocking -import okhttp3.OkHttpClient -import okhttp3.mockwebserver.MockWebServer import org.junit.Rule import org.junit.Test -import java.util.concurrent.TimeUnit class ReceiveChannelTest { - @get:Rule val mockWebServer = MockWebServer() - private val serverUrlString by lazy { mockWebServer.url("/").toString() } - - private val serverLifecycleRegistry = LifecycleRegistry() - private lateinit var server: Service - private lateinit var serverEventObserver: TestStreamObserver - - private val clientLifecycleRegistry = LifecycleRegistry() - private lateinit var client: Service - private lateinit var clientEventObserver: TestStreamObserver + @get:Rule val clientServerModel = object : ClientServerModel(Service::class.java) { + override fun observeWebSocketEvents(service: Service): TestStreamObserver { + return service.observeEvents().test() + } + } @Test fun send_givenConnectionIsEstablished_shouldBeReceivedByTheServer() { // Given - givenConnectionIsEstablished() + val (client, server) = clientServerModel.givenConnectionIsEstablished() val textMessage1 = "Hello" val textMessage2 = "Hi!" val bytesMessage1 = "Yo".toByteArray() @@ -62,7 +48,7 @@ class ReceiveChannelTest { assertThat(isSendTextSuccessful).isTrue() assertThat(isSendBytesSuccessful).isTrue() - serverEventObserver.awaitValues( + clientServerModel.awaitValues( any>(), any().containingText(textMessage1), any().containingText(textMessage2), @@ -79,54 +65,7 @@ class ReceiveChannelTest { } } - private fun givenConnectionIsEstablished() { - createClientAndServer() - serverLifecycleRegistry.onNext(Lifecycle.State.Started) - clientLifecycleRegistry.onNext(Lifecycle.State.Started) - blockUntilConnectionIsEstablish() - } - - private fun createClientAndServer() { - server = createServer() - serverEventObserver = server.observeEvents().test() - client = createClient() - clientEventObserver = client.observeEvents().test() - } - - private fun createServer(): Service { - val webSocketFactory = mockWebServer.newWebSocketFactory() - val scarlet = Scarlet.Builder() - .webSocketFactory(webSocketFactory) - .lifecycle(serverLifecycleRegistry) - .addStreamAdapterFactory(CoroutinesStreamAdapterFactory()) - .build() - return scarlet.create() - } - - private fun createClient(): Service { - val okHttpClient = OkHttpClient.Builder() - .writeTimeout(500, TimeUnit.MILLISECONDS) - .readTimeout(500, TimeUnit.MILLISECONDS) - .build() - val webSocketFactory = okHttpClient.newWebSocketFactory(serverUrlString) - val scarlet = Scarlet.Builder() - .webSocketFactory(webSocketFactory) - .lifecycle(clientLifecycleRegistry) - .addStreamAdapterFactory(CoroutinesStreamAdapterFactory()) - .build() - return scarlet.create() - } - - private fun blockUntilConnectionIsEstablish() { - clientEventObserver.awaitValues( - any>() - ) - serverEventObserver.awaitValues( - any>() - ) - } - - private interface Service { + interface Service { @Receive fun observeEvents(): Stream