diff --git a/build.gradle b/build.gradle index 0cbaf59b..76416adf 100644 --- a/build.gradle +++ b/build.gradle @@ -1,7 +1,7 @@ // Top-level build file where you can add configuration options common to all sub-projects/modules. buildscript { - ext.kotlin_version = '1.4.31' + ext.kotlin_version = '1.5.30' ext.dokka_version = '0.9.16' repositories { mavenCentral() @@ -10,7 +10,7 @@ buildscript { } dependencies { classpath 'com.vanniktech:gradle-maven-publish-plugin:0.8.0' - classpath 'com.android.tools.build:gradle:4.1.2' + classpath 'com.android.tools.build:gradle:4.1.3' classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version" } } diff --git a/demo/build.gradle b/demo/build.gradle index 949c01f5..6c0fac55 100755 --- a/demo/build.gradle +++ b/demo/build.gradle @@ -33,6 +33,7 @@ dependencies { implementation project(':scarlet-message-adapter-moshi') implementation project(':scarlet-message-adapter-protobuf') implementation project(':scarlet-stream-adapter-rxjava2') + implementation project(':scarlet-stream-adapter-coroutines') implementation libs.appCompat implementation libs.material diff --git a/demo/src/main/java/com/tinder/app/echo/api/EchoService.kt b/demo/src/main/java/com/tinder/app/echo/api/EchoService.kt index a13580c5..4cc84e47 100755 --- a/demo/src/main/java/com/tinder/app/echo/api/EchoService.kt +++ b/demo/src/main/java/com/tinder/app/echo/api/EchoService.kt @@ -9,20 +9,20 @@ import com.tinder.scarlet.Event import com.tinder.scarlet.State import com.tinder.scarlet.ws.Receive import com.tinder.scarlet.ws.Send -import io.reactivex.Flowable +import kotlinx.coroutines.flow.Flow interface EchoService { @Receive - fun observeState(): Flowable + fun observeState(): Flow @Receive - fun observeEvent(): Flowable + fun observeEvent(): Flow @Receive - fun observeText(): Flowable + fun observeText(): Flow @Receive - fun observeBitmap(): Flowable + fun observeBitmap(): Flow @Send fun sendText(message: String): Boolean diff --git a/demo/src/main/java/com/tinder/app/echo/domain/ChatMessageRepository.kt b/demo/src/main/java/com/tinder/app/echo/domain/ChatMessageRepository.kt index c9d74349..aa6013bc 100644 --- a/demo/src/main/java/com/tinder/app/echo/domain/ChatMessageRepository.kt +++ b/demo/src/main/java/com/tinder/app/echo/domain/ChatMessageRepository.kt @@ -13,7 +13,10 @@ import com.tinder.scarlet.State import com.tinder.scarlet.WebSocket import io.reactivex.Flowable import io.reactivex.processors.BehaviorProcessor -import io.reactivex.schedulers.Schedulers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach import org.joda.time.DateTime import timber.log.Timber import java.util.concurrent.atomic.AtomicInteger @@ -30,8 +33,7 @@ class ChatMessageRepository @Inject constructor( init { echoService.observeEvent() - .observeOn(Schedulers.io()) - .subscribe({ event -> + .onEach { event -> val description = when (event) { is Event.OnLifecycle.StateChange<*> -> when (event.state) { Lifecycle.State.Started -> "\uD83C\uDF1D On Lifecycle Start" @@ -57,15 +59,15 @@ class ChatMessageRepository @Inject constructor( } Event.OnRetry -> "⏰ On Retry" } - val chatMessage = ChatMessage.Text(generateMessageId(), description, ChatMessage.Source.RECEIVED) + val chatMessage = + ChatMessage.Text(generateMessageId(), description, ChatMessage.Source.RECEIVED) addChatMessage(chatMessage) - }, { e -> + }.catch { e -> Timber.e(e) - }) + }.launchIn(GlobalScope) echoService.observeText() - .observeOn(Schedulers.io()) - .subscribe({ text -> + .onEach { text -> val chatMessage = ChatMessage.Text( generateMessageId(), text, @@ -73,13 +75,12 @@ class ChatMessageRepository @Inject constructor( DateTime.now().plusMillis(50) ) addChatMessage(chatMessage) - }, { e -> + }.catch { e -> Timber.e(e) - }) + }.launchIn(GlobalScope) echoService.observeBitmap() - .observeOn(Schedulers.io()) - .subscribe({ bitmap -> + .onEach { bitmap -> val chatMessage = ChatMessage.Image( generateMessageId(), bitmap, @@ -87,9 +88,9 @@ class ChatMessageRepository @Inject constructor( DateTime.now().plusMillis(50) ) addChatMessage(chatMessage) - }, { e -> + }.catch { e -> Timber.e(e) - }) + }.launchIn(GlobalScope) } fun observeChatMessage(): Flowable> = messagesProcessor diff --git a/demo/src/main/java/com/tinder/app/echo/inject/EchoBotComponent.kt b/demo/src/main/java/com/tinder/app/echo/inject/EchoBotComponent.kt index 3312e47b..e64f7912 100644 --- a/demo/src/main/java/com/tinder/app/echo/inject/EchoBotComponent.kt +++ b/demo/src/main/java/com/tinder/app/echo/inject/EchoBotComponent.kt @@ -14,6 +14,7 @@ import com.tinder.scarlet.Scarlet import com.tinder.scarlet.lifecycle.android.AndroidLifecycle import com.tinder.scarlet.streamadapter.rxjava2.RxJava2StreamAdapterFactory import com.tinder.scarlet.websocket.okhttp.newWebSocketFactory +import com.tinder.streamadapter.coroutines.CoroutinesStreamAdapterFactory import dagger.Component import dagger.Module import dagger.Provides @@ -59,6 +60,7 @@ interface EchoBotComponent { .lifecycle(lifecycle) .addMessageAdapterFactory(BitmapMessageAdapter.Factory()) .addStreamAdapterFactory(RxJava2StreamAdapterFactory()) + .addStreamAdapterFactory(CoroutinesStreamAdapterFactory()) .build() return scarlet.create() } diff --git a/demo/src/main/java/com/tinder/app/gdax/view/GdaxFragment.kt b/demo/src/main/java/com/tinder/app/gdax/view/GdaxFragment.kt index fe2d27b8..a8bc4039 100644 --- a/demo/src/main/java/com/tinder/app/gdax/view/GdaxFragment.kt +++ b/demo/src/main/java/com/tinder/app/gdax/view/GdaxFragment.kt @@ -101,7 +101,7 @@ class GdaxFragment : Fragment(), GdaxTarget { setDrawValues(false) } - val minPrice = priceEntries.minBy { it.y }?.y ?: 0F + val minPrice = priceEntries.minByOrNull { it.y }?.y ?: 0F val minPriceEntries = listOf( Entry(minutesAgo.millisOfDay.toFloat(), minPrice), Entry(now.millisOfDay.toFloat(), minPrice) diff --git a/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/CoroutinesStreamAdapterFactory.kt b/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/CoroutinesStreamAdapterFactory.kt index 55780e09..0bb987d4 100644 --- a/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/CoroutinesStreamAdapterFactory.kt +++ b/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/CoroutinesStreamAdapterFactory.kt @@ -7,6 +7,7 @@ package com.tinder.streamadapter.coroutines import com.tinder.scarlet.StreamAdapter import com.tinder.scarlet.utils.getRawType import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.flow.Flow import java.lang.reflect.Type /** @@ -16,8 +17,9 @@ class CoroutinesStreamAdapterFactory : StreamAdapter.Factory { override fun create(type: Type): StreamAdapter { return when (type.getRawType()) { - ReceiveChannel::class.java -> ReceiveChannelStreamAdapter() + Flow::class.java -> FlowStreamAdapter() + ReceiveChannel::class.java -> ReceiveChannelAdapter() else -> throw IllegalArgumentException() } } -} \ No newline at end of file +} diff --git a/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/FlowStreamAdapter.kt b/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/FlowStreamAdapter.kt new file mode 100644 index 00000000..d008b3a9 --- /dev/null +++ b/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/FlowStreamAdapter.kt @@ -0,0 +1,17 @@ +/* + * © 2013 - 2018 Tinder, Inc., ALL RIGHTS RESERVED + */ + +package com.tinder.streamadapter.coroutines + +import com.tinder.scarlet.Stream +import com.tinder.scarlet.StreamAdapter +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.reactive.asFlow + +class FlowStreamAdapter : StreamAdapter> { + + override fun adapt(stream: Stream): Flow { + return (stream as Stream).asFlow() as Flow + } +} diff --git a/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/ReceiveChannelAdapter.kt b/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/ReceiveChannelAdapter.kt new file mode 100644 index 00000000..a103a524 --- /dev/null +++ b/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/ReceiveChannelAdapter.kt @@ -0,0 +1,33 @@ +package com.tinder.streamadapter.coroutines + +import com.tinder.scarlet.Stream +import com.tinder.scarlet.StreamAdapter +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.sendBlocking + +class ReceiveChannelAdapter : StreamAdapter> { + + override fun adapt(stream: Stream): ReceiveChannel { + val channelForwarder = ChannelForwarder() + stream.start(channelForwarder) + return channelForwarder.channel + } + + private class ChannelForwarder : Stream.Observer { + private val _channel = Channel() + val channel: ReceiveChannel = _channel + + override fun onComplete() { + _channel.close() + } + + override fun onError(throwable: Throwable) { + _channel.close(throwable) + } + + override fun onNext(data: T) { + _channel.sendBlocking(data) + } + } +} diff --git a/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/ReceiveChannelStreamAdapter.kt b/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/ReceiveChannelStreamAdapter.kt deleted file mode 100644 index cb47b005..00000000 --- a/scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/ReceiveChannelStreamAdapter.kt +++ /dev/null @@ -1,15 +0,0 @@ -/* - * © 2013 - 2018 Tinder, Inc., ALL RIGHTS RESERVED - */ - -package com.tinder.streamadapter.coroutines - -import com.tinder.scarlet.Stream -import com.tinder.scarlet.StreamAdapter -import kotlinx.coroutines.channels.ReceiveChannel -import kotlinx.coroutines.reactive.openSubscription - -class ReceiveChannelStreamAdapter : StreamAdapter> { - - override fun adapt(stream: Stream) = stream.openSubscription() -} \ No newline at end of file