Skip to content

Commit

Permalink
Improve usage of Channels in tests (#1585)
Browse files Browse the repository at this point in the history
  • Loading branch information
cmelchior authored Dec 7, 2023
1 parent 16abd75 commit 0834a9c
Show file tree
Hide file tree
Showing 22 changed files with 223 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import io.realm.kotlin.test.platform.PlatformUtils
import io.realm.kotlin.types.RealmInstant
import io.realm.kotlin.types.RealmObject
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.selects.onTimeout
import kotlinx.coroutines.selects.select
import kotlinx.coroutines.channels.ChannelResult
import kotlinx.coroutines.withTimeout
import kotlinx.datetime.Instant
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
Expand Down Expand Up @@ -93,13 +94,48 @@ fun Instant.toRealmInstant(): RealmInstant {
}
}

/**
* Channel implementation specifically suited for tests. Its size is unlimited, but will fail
* the test if canceled while still containing unconsumed elements.
*/
inline fun <T> TestChannel(
capacity: Int = Channel.UNLIMITED,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
failIfBufferIsEmptyOnCancel: Boolean = true
): Channel<T> {
return Channel<T>(capacity = capacity, onBufferOverflow = onBufferOverflow) {
if (failIfBufferIsEmptyOnCancel) {
throw AssertionError("Failed to deliver: $it")
}
}
}

/**
* Helper method that will use `trySend` to send a message to a Channel and throw
* an error if `trySend` returns false
*/
inline fun <T : Any?> Channel<T>.trySendOrFail(value: T) {
val result: ChannelResult<Unit> = this.trySend(value)
if (result.isFailure) {
val additionalErrorInfo = result.exceptionOrNull()?.let { throwable ->
" An exception was thrown:\n${throwable.stackTraceToString()}"
} ?: ""
throw AssertionError("Could not send message to channel: $value.$additionalErrorInfo")
}
}

// Variant of `Channel.receiveOrFail()` that will will throw if a timeout is hit.
suspend fun <T : Any?> Channel<T>.receiveOrFail(timeout: Duration = 1.minutes, message: String? = null): T {
return select {
this@receiveOrFail.onReceive { it }
onTimeout(timeout) {
@Suppress("invisible_member")
throw TimeoutCancellationException("Timeout: $message")
try {
return withTimeout(timeout) {
// Note, using `select` with `onReceive` seems to cause some tests to hang for unknown
// reasons. Right now the hypothesis is that because `onReceive` does not consume the
// elements, it is causing some kind of race condition we have not been able to
// find. For previous iterations of this method, see the Git history.
this@receiveOrFail.receive()
}
} catch (ex: TimeoutCancellationException) {
@Suppress("invisible_reference", "invisible_member")
throw TimeoutCancellationException("Timeout after $timeout: ${if (message.isNullOrBlank()) "<no message>" else message}")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import io.realm.kotlin.query.sum
import io.realm.kotlin.schema.RealmStorageType
import io.realm.kotlin.test.common.utils.assertFailsWithMessage
import io.realm.kotlin.test.platform.PlatformUtils
import io.realm.kotlin.test.util.TestChannel
import io.realm.kotlin.test.util.TypeDescriptor
import io.realm.kotlin.test.util.receiveOrFail
import io.realm.kotlin.types.ObjectId
Expand Down Expand Up @@ -342,7 +343,7 @@ class QueryTests {

@Test
fun asFlow_initialResults() {
val channel = Channel<ResultsChange<QuerySample>>(1)
val channel = TestChannel<ResultsChange<QuerySample>>()

runBlocking {
val observer = async {
Expand All @@ -366,7 +367,7 @@ class QueryTests {

@Test
fun asFlow() {
val channel = Channel<ResultsChange<QuerySample>>(1)
val channel = TestChannel<ResultsChange<QuerySample>>()

runBlocking {
val observer = async {
Expand Down Expand Up @@ -399,7 +400,7 @@ class QueryTests {

@Test
fun asFlow_deleteObservable() {
val channel = Channel<ResultsChange<QuerySample>>(1)
val channel = TestChannel<ResultsChange<QuerySample>>()

runBlocking {
realm.writeBlocking {
Expand Down Expand Up @@ -1003,7 +1004,7 @@ class QueryTests {

@Test
fun count_asFlow_initialValue() {
val channel = Channel<Long>(1)
val channel = TestChannel<Long>()

runBlocking {
val observer = async {
Expand All @@ -1025,7 +1026,7 @@ class QueryTests {

@Test
fun count_asFlow() {
val channel = Channel<Long>(1)
val channel = TestChannel<Long>()

runBlocking {
val observer = async {
Expand All @@ -1052,7 +1053,7 @@ class QueryTests {

@Test
fun count_asFlow_deleteObservable() {
val channel = Channel<Long>(1)
val channel = TestChannel<Long>()

runBlocking {
realm.write {
Expand Down Expand Up @@ -1085,8 +1086,8 @@ class QueryTests {
@Test
fun count_asFlow_cancel() {
runBlocking {
val channel1 = Channel<Long?>(1)
val channel2 = Channel<Long?>(1)
val channel1 = TestChannel<Long?>()
val channel2 = TestChannel<Long?>()

val observer1 = async {
realm.query<QuerySample>()
Expand Down Expand Up @@ -2315,7 +2316,7 @@ class QueryTests {

@Test
fun playground_multiThreadScenario() {
val channel = Channel<RealmResults<QuerySample>>(1)
val channel = TestChannel<RealmResults<QuerySample>>()
var query: RealmQuery<QuerySample>? = null
val scope = singleThreadDispatcher("1")
val intValue = 666
Expand Down Expand Up @@ -2577,7 +2578,7 @@ class QueryTests {
type: AggregatorQueryType,
propertyDescriptor: PropertyDescriptor
) {
val channel = Channel<Any?>(1)
val channel = TestChannel<Any?>()
val expectedAggregate = when (type) {
AggregatorQueryType.MIN -> expectedMin(propertyDescriptor.clazz)
AggregatorQueryType.MAX -> expectedMax(propertyDescriptor.clazz)
Expand Down Expand Up @@ -2727,7 +2728,7 @@ class QueryTests {
type: AggregatorQueryType,
propertyDescriptor: PropertyDescriptor
) {
val channel = Channel<Any?>(1)
val channel = TestChannel<Any?>()
val expectedAggregate = when (type) {
AggregatorQueryType.MIN -> expectedMin(propertyDescriptor.clazz)
AggregatorQueryType.MAX -> expectedMax(propertyDescriptor.clazz)
Expand Down Expand Up @@ -2825,8 +2826,8 @@ class QueryTests {
type: AggregatorQueryType,
propertyDescriptor: PropertyDescriptor
) {
val channel1 = Channel<Any?>(1)
val channel2 = Channel<Any?>(1)
val channel1 = TestChannel<Any?>()
val channel2 = TestChannel<Any?>()

runBlocking {
// Subscribe to flow 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import io.realm.kotlin.notifications.UpdatedObject
import io.realm.kotlin.query.find
import io.realm.kotlin.test.common.utils.assertFailsWithMessage
import io.realm.kotlin.test.platform.PlatformUtils
import io.realm.kotlin.test.util.TestChannel
import io.realm.kotlin.test.util.TypeDescriptor
import io.realm.kotlin.test.util.receiveOrFail
import io.realm.kotlin.test.util.use
Expand All @@ -44,7 +45,6 @@ import io.realm.kotlin.types.RealmObject
import io.realm.kotlin.types.RealmUUID
import io.realm.kotlin.types.annotations.Index
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import org.mongodb.kbson.BsonDecimal128
import org.mongodb.kbson.BsonObjectId
import org.mongodb.kbson.Decimal128
Expand Down Expand Up @@ -373,8 +373,8 @@ class RealmAnyTests {
@Test
fun managed_deleteObjectInsideRealmAnyTriggersUpdateInContainer() {
runBlocking {
val sampleChannel = Channel<SingleQueryChange<Sample>>(1)
val containerChannel = Channel<SingleQueryChange<RealmAnyContainer>>(1)
val sampleChannel = TestChannel<SingleQueryChange<Sample>>()
val containerChannel = TestChannel<SingleQueryChange<RealmAnyContainer>>()

val sampleObserver = async {
realm.query<Sample>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import io.realm.kotlin.internal.platform.fileExists
import io.realm.kotlin.internal.platform.runBlocking
import io.realm.kotlin.test.common.utils.assertFailsWithMessage
import io.realm.kotlin.test.platform.PlatformUtils
import io.realm.kotlin.test.util.TestChannel
import io.realm.kotlin.test.util.TestHelper
import io.realm.kotlin.test.util.receiveOrFail
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.withTimeout
import kotlin.random.Random
import kotlin.test.AfterTest
Expand Down Expand Up @@ -181,9 +181,9 @@ class RealmInMemoryTests {
@Test
fun multiThread() {
val threadError = arrayOfNulls<Exception>(1)
val workerCommittedChannel = Channel<Boolean>(1)
val workerClosedChannel = Channel<Boolean>(1)
val realmInMainClosedChannel = Channel<Boolean>(1)
val workerCommittedChannel = TestChannel<Boolean>()
val workerClosedChannel = TestChannel<Boolean>()
val realmInMainClosedChannel = TestChannel<Boolean>()
runBlocking {
// Step 2.
async {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ import io.realm.kotlin.query.find
import io.realm.kotlin.test.common.utils.assertFailsWithMessage
import io.realm.kotlin.test.platform.PlatformUtils
import io.realm.kotlin.test.platform.platformFileSystem
import io.realm.kotlin.test.util.TestChannel
import io.realm.kotlin.test.util.receiveOrFail
import io.realm.kotlin.test.util.use
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.newSingleThreadContext
Expand Down Expand Up @@ -449,9 +449,9 @@ class RealmTests {
.directory(testDir)
.build()

val bgThreadReadyChannel = Channel<Unit>(1)
val readyToCloseChannel = Channel<Unit>(1)
val closedChannel = Channel<Unit>(1)
val bgThreadReadyChannel = TestChannel<Unit>()
val readyToCloseChannel = TestChannel<Unit>()
val closedChannel = TestChannel<Unit>()

runBlocking {
val testRealm = Realm.open(configuration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ import io.realm.kotlin.log.RealmLog
import io.realm.kotlin.notifications.RealmChange
import io.realm.kotlin.notifications.ResultsChange
import io.realm.kotlin.test.platform.PlatformUtils
import io.realm.kotlin.test.util.TestChannel
import io.realm.kotlin.test.util.receiveOrFail
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.runBlocking
Expand Down Expand Up @@ -171,7 +171,7 @@ class VersionTrackingTests {

// Listening to object causes tracking of all versions even if not returned by the write
val samples = mutableListOf<ResultsChange<Sample>>()
val channel = Channel<ResultsChange<Sample>>(1)
val channel = TestChannel<ResultsChange<Sample>>()
val initialVersion = realm.version().version
val writes = 5
val objectListener = async {
Expand Down Expand Up @@ -222,12 +222,12 @@ class VersionTrackingTests {
assertNotNull(realm.initialRealmReference.value, toString())
assertEquals(1, realm.versionTracker.versions().size, toString())

val realmUpdates = Channel<Unit>(1)
val realmUpdates = TestChannel<Unit>()

runBlocking {
val deferred = async {
realm.asFlow().collect {
realmUpdates.trySend(Unit)
realmUpdates.send(Unit)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import io.realm.kotlin.notifications.UpdatedResults
import io.realm.kotlin.query.RealmResults
import io.realm.kotlin.test.common.utils.RealmEntityNotificationTests
import io.realm.kotlin.test.platform.PlatformUtils
import io.realm.kotlin.test.util.TestChannel
import io.realm.kotlin.test.util.receiveOrFail
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
Expand Down Expand Up @@ -83,13 +84,13 @@ class BacklinksNotificationsTests : RealmEntityNotificationTests {
sample.setBacklinks
)
}.forEach { results ->
val c = Channel<ResultsChange<Sample>>(1)
val c = TestChannel<ResultsChange<Sample>>()

val observer = async {
results
.asFlow()
.collect {
c.trySend(it)
c.send(it)
}
}

Expand Down Expand Up @@ -141,7 +142,7 @@ class BacklinksNotificationsTests : RealmEntityNotificationTests {
results
.asFlow()
.collect {
c.trySend(it)
c.send(it)
}
}

Expand Down Expand Up @@ -180,7 +181,7 @@ class BacklinksNotificationsTests : RealmEntityNotificationTests {
val c = Channel<ResultsChange<Sample>>(capacity = 5)
val collection = async {
target.objectBacklinks.asFlow().collect {
c.trySend(it)
c.send(it)
}
}

Expand Down Expand Up @@ -235,17 +236,17 @@ class BacklinksNotificationsTests : RealmEntityNotificationTests {
setBacklinks
)
}.forEach { results ->
val c1 = Channel<ResultsChange<Sample>>(1)
val c2 = Channel<ResultsChange<Sample>>(1)
val c1 = TestChannel<ResultsChange<Sample>>()
val c2 = TestChannel<ResultsChange<Sample>>()

val observer1 = async {
results.asFlow().collect {
c1.trySend(it)
c1.send(it)
}
}
val observer2 = async {
results.asFlow().collect {
c2.trySend(it)
c2.send(it)
}
}

Expand Down Expand Up @@ -290,10 +291,10 @@ class BacklinksNotificationsTests : RealmEntityNotificationTests {
results
.asFlow()
.onCompletion {
c.trySend(null)
c.send(null)
}
.collect {
c.trySend(it)
c.send(it)
}
}

Expand All @@ -302,10 +303,10 @@ class BacklinksNotificationsTests : RealmEntityNotificationTests {
.query("TRUEPREDICATE")
.asFlow()
.onCompletion {
sc.trySend(null)
sc.send(null)
}
.collect {
sc.trySend(it)
sc.send(it)
}
}

Expand Down
Loading

0 comments on commit 0834a9c

Please sign in to comment.