Skip to content

Commit

Permalink
Updated Kotlin and the libs for coroutines (#200)
Browse files Browse the repository at this point in the history
* Migrated Project to Kotlin 1.5.30

* Updated scarlet-stream-adapter-coroutines to be compatible with 1.5.30 as well regarding use of asFlow and channels

* Updated coroutines to 1.5.2

* scarlet-stream-adapter-coroutines produces hot flow and hot receive channel

Fixes #169
Fixes #163
Fixes #27

* ChannelForwarder now calls dispose correctly on Completed and Error

* Using new trySendBlocking in onNext callback

* Updating Tests to include FlowStreamAdapterTest

* Reving kotlin coroutines 1.6.0

* Updating circleci configuration to stash the correct test reports

* Updating to coroutines 1.4 and higher removes the reactive streams behavior of the past
Relied on a subscription and a buffered observer, adding buffering

* Documentation

* No need to rename files

* Use supervisor job instead of globalscope

* Proper variable naming

* Channel Forwarder should not throw
  • Loading branch information
greggiacovelli authored Jan 30, 2022
1 parent 6d98e7d commit 72dcb2a
Show file tree
Hide file tree
Showing 17 changed files with 400 additions and 191 deletions.
9 changes: 7 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@ jobs:
name: Disable PreDexing
command: echo "disablePreDex" >> gradle.properties
- run: if [ -e ./gradlew ]; then ./gradlew dependencies;else gradle dependencies;fi
- run: mkdir -p test-reports/junit/
- run: ./gradlew test
- run: mkdir -p $CIRCLE_TEST_REPORTS/junit/
- run: find . -type f -regex ".*/build/test-results/.*xml" -exec cp {} $CIRCLE_TEST_REPORTS/junit/ \;
- run:
name: gather_test_results
command: find . -type f -regex ".*/build/test-results/.*xml" -exec cp {} test-reports/junit/ \;
when: always
- store_test_results:
path: test-reports
- run:
name: Deploy Snapshot
command: ./publish.sh
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Top-level build file where you can add configuration options common to all sub-projects/modules.

buildscript {
ext.kotlin_version = '1.4.31'
ext.kotlin_version = '1.5.30'
ext.dokka_version = '0.9.16'
repositories {
mavenCentral()
Expand All @@ -10,7 +10,7 @@ buildscript {
}
dependencies {
classpath 'com.vanniktech:gradle-maven-publish-plugin:0.8.0'
classpath 'com.android.tools.build:gradle:4.1.2'
classpath 'com.android.tools.build:gradle:4.1.3'
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
}
}
Expand Down
1 change: 1 addition & 0 deletions demo/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
implementation project(':scarlet-message-adapter-moshi')
implementation project(':scarlet-message-adapter-protobuf')
implementation project(':scarlet-stream-adapter-rxjava2')
implementation project(':scarlet-stream-adapter-coroutines')

implementation libs.appCompat
implementation libs.material
Expand Down
10 changes: 5 additions & 5 deletions demo/src/main/java/com/tinder/app/echo/api/EchoService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@ import com.tinder.scarlet.Event
import com.tinder.scarlet.State
import com.tinder.scarlet.ws.Receive
import com.tinder.scarlet.ws.Send
import io.reactivex.Flowable
import kotlinx.coroutines.flow.Flow

interface EchoService {
@Receive
fun observeState(): Flowable<State>
fun observeState(): Flow<State>

@Receive
fun observeEvent(): Flowable<Event>
fun observeEvent(): Flow<Event>

@Receive
fun observeText(): Flowable<String>
fun observeText(): Flow<String>

@Receive
fun observeBitmap(): Flowable<Bitmap>
fun observeBitmap(): Flow<Bitmap>

@Send
fun sendText(message: String): Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import com.tinder.scarlet.State
import com.tinder.scarlet.WebSocket
import io.reactivex.Flowable
import io.reactivex.processors.BehaviorProcessor
import io.reactivex.schedulers.Schedulers
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import org.joda.time.DateTime
import timber.log.Timber
import java.util.concurrent.atomic.AtomicInteger
Expand All @@ -22,16 +25,16 @@ import javax.inject.Inject

@EchoBotScope
class ChatMessageRepository @Inject constructor(
private val echoService: EchoService
private val echoService: EchoService,
private val coroutineScope: CoroutineScope
) {
private val messageCount = AtomicInteger()
private val messagesRef = AtomicReference<List<ChatMessage>>()
private val messagesProcessor = BehaviorProcessor.create<List<ChatMessage>>()

init {
echoService.observeEvent()
.observeOn(Schedulers.io())
.subscribe({ event ->
.onEach { event ->
val description = when (event) {
is Event.OnLifecycle.StateChange<*> -> when (event.state) {
Lifecycle.State.Started -> "\uD83C\uDF1D On Lifecycle Start"
Expand All @@ -57,39 +60,38 @@ class ChatMessageRepository @Inject constructor(
}
Event.OnRetry -> "⏰ On Retry"
}
val chatMessage = ChatMessage.Text(generateMessageId(), description, ChatMessage.Source.RECEIVED)
val chatMessage =
ChatMessage.Text(generateMessageId(), description, ChatMessage.Source.RECEIVED)
addChatMessage(chatMessage)
}, { e ->
}.catch { e ->
Timber.e(e)
})
}.launchIn(coroutineScope)

echoService.observeText()
.observeOn(Schedulers.io())
.subscribe({ text ->
.onEach { text ->
val chatMessage = ChatMessage.Text(
generateMessageId(),
text,
ChatMessage.Source.RECEIVED,
DateTime.now().plusMillis(50)
)
addChatMessage(chatMessage)
}, { e ->
}.catch { e ->
Timber.e(e)
})
}.launchIn(coroutineScope)

echoService.observeBitmap()
.observeOn(Schedulers.io())
.subscribe({ bitmap ->
.onEach { bitmap ->
val chatMessage = ChatMessage.Image(
generateMessageId(),
bitmap,
ChatMessage.Source.RECEIVED,
DateTime.now().plusMillis(50)
)
addChatMessage(chatMessage)
}, { e ->
}.catch { e ->
Timber.e(e)
})
}.launchIn(coroutineScope)
}

fun observeChatMessage(): Flowable<List<ChatMessage>> = messagesProcessor
Expand Down
10 changes: 10 additions & 0 deletions demo/src/main/java/com/tinder/app/echo/inject/EchoBotComponent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ import com.tinder.scarlet.Scarlet
import com.tinder.scarlet.lifecycle.android.AndroidLifecycle
import com.tinder.scarlet.streamadapter.rxjava2.RxJava2StreamAdapterFactory
import com.tinder.scarlet.websocket.okhttp.newWebSocketFactory
import com.tinder.streamadapter.coroutines.CoroutinesStreamAdapterFactory
import dagger.Component
import dagger.Module
import dagger.Provides
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import okhttp3.OkHttpClient
import okhttp3.logging.HttpLoggingInterceptor

Expand Down Expand Up @@ -59,9 +62,16 @@ interface EchoBotComponent {
.lifecycle(lifecycle)
.addMessageAdapterFactory(BitmapMessageAdapter.Factory())
.addStreamAdapterFactory(RxJava2StreamAdapterFactory())
.addStreamAdapterFactory(CoroutinesStreamAdapterFactory())
.build()
return scarlet.create()
}

@Provides
@EchoBotScope
fun provideCoroutineScopeToRunIn(): CoroutineScope {
return CoroutineScope(SupervisorJob())
}
}

interface ComponentProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class GdaxFragment : Fragment(), GdaxTarget {
setDrawValues(false)
}

val minPrice = priceEntries.minBy { it.y }?.y ?: 0F
val minPrice = priceEntries.minByOrNull { it.y }?.y ?: 0F
val minPriceEntries = listOf(
Entry(minutesAgo.millisOfDay.toFloat(), minPrice),
Entry(now.millisOfDay.toFloat(), minPrice)
Expand Down
7 changes: 5 additions & 2 deletions dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ ext.versions = [
rxAndroid: '2.0.2',
rxKotlin: '2.2.0',
rxJava1: '1.3.4',
kotlinxCoroutines: '1.4.3',
kotlinxCoroutines: '1.6.0',

stetho: '1.5.0',
stethoOkHttp: '1.5.0',
Expand Down Expand Up @@ -42,6 +42,7 @@ ext.versions = [

junit: '4.12',
assertJ: '3.8.0',
truth: '1.1.3',

stateMachine: "0.2.0"
]
Expand All @@ -61,7 +62,8 @@ ext.libs = [
kotlinx: [
coroutines: [
core: "org.jetbrains.kotlinx:kotlinx-coroutines-core:$versions.kotlinxCoroutines",
reactive: "org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$versions.kotlinxCoroutines"
reactive: "org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$versions.kotlinxCoroutines",
test: "org.jetbrains.kotlinx:kotlinx-coroutines-test:$versions.kotlinxCoroutines"
]
],

Expand Down Expand Up @@ -100,6 +102,7 @@ ext.libs = [
junit: "junit:junit:$versions.junit",
mockito: "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0",
assertJ: "org.assertj:assertj-core:$versions.assertJ",
truth: "com.google.truth:truth:$versions.truth",

kotlin: [
stdlib: "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version",
Expand Down
3 changes: 2 additions & 1 deletion scarlet-stream-adapter-coroutines/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ dependencies {
testImplementation libs.junit
testImplementation libs.mockito
testImplementation libs.kotlin.reflect
testImplementation libs.assertJ
testImplementation libs.kotlinx.coroutines.test
testImplementation libs.truth
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.tinder.streamadapter.coroutines

import com.tinder.scarlet.Stream
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.onClosed
import kotlinx.coroutines.channels.onFailure
import kotlinx.coroutines.channels.trySendBlocking

internal class ChannelForwarder<T>(bufferSize: Int) : Stream.Observer<T> {
private val channel = Channel<T>(bufferSize, BufferOverflow.DROP_OLDEST)
private var disposable: Stream.Disposable? = null

fun start(stream: Stream<T>): ReceiveChannel<T> {
disposable = stream.start(this)
return channel
}

override fun onComplete() {
channel.close()
disposable?.dispose()
}

override fun onError(throwable: Throwable) {
channel.close(throwable)
disposable?.dispose()
}

override fun onNext(data: T) {
channel.trySendBlocking(data).onClosed {
// ignore
}.onFailure {
// ignore
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,25 @@ package com.tinder.streamadapter.coroutines
import com.tinder.scarlet.StreamAdapter
import com.tinder.scarlet.utils.getRawType
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.Flow
import java.lang.reflect.Type

/**
* A [stream adapter factory][StreamAdapter.Factory] that uses ReceiveChannel.
* A [stream adapter factory][StreamAdapter.Factory] that allows for [ReceiveChannel]
* and [Flow] based streams.
* [bufferSize] is configurable for the underlying channels, defaults to [DEFAULT_BUFFER]
*/
class CoroutinesStreamAdapterFactory : StreamAdapter.Factory {
private const val DEFAULT_BUFFER = 128

class CoroutinesStreamAdapterFactory(
private val bufferSize: Int = DEFAULT_BUFFER
) : StreamAdapter.Factory {

override fun create(type: Type): StreamAdapter<Any, Any> {
return when (type.getRawType()) {
ReceiveChannel::class.java -> ReceiveChannelStreamAdapter()
Flow::class.java -> FlowStreamAdapter(bufferSize)
ReceiveChannel::class.java -> ReceiveChannelStreamAdapter(bufferSize)
else -> throw IllegalArgumentException()
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* © 2013 - 2018 Tinder, Inc., ALL RIGHTS RESERVED
*/

package com.tinder.streamadapter.coroutines

import com.tinder.scarlet.Stream
import com.tinder.scarlet.StreamAdapter
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.receiveAsFlow

class FlowStreamAdapter<T>(private val buffer: Int) : StreamAdapter<T, Flow<T>> {

override fun adapt(stream: Stream<T>): Flow<T> {
return ChannelForwarder<T>(buffer).start(stream).receiveAsFlow()
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
/*
* © 2013 - 2018 Tinder, Inc., ALL RIGHTS RESERVED
*/

package com.tinder.streamadapter.coroutines

import com.tinder.scarlet.Stream
import com.tinder.scarlet.StreamAdapter
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.reactive.openSubscription

class ReceiveChannelStreamAdapter<T> : StreamAdapter<T, ReceiveChannel<T>> {
class ReceiveChannelStreamAdapter<T>(private val buffer: Int) : StreamAdapter<T, ReceiveChannel<T>> {

override fun adapt(stream: Stream<T>) = stream.openSubscription()
}
override fun adapt(stream: Stream<T>): ReceiveChannel<T> {
val channelForwarder = ChannelForwarder<T>(buffer)
return channelForwarder.start(stream)
}
}
Loading

0 comments on commit 72dcb2a

Please sign in to comment.