diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b8f417b7c..ebee13f301 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ ### Fixed * Cache notification callback JNI references at startup to ensure that symbols can be resolved in core callbacks. (Issue [#1577](https://github.com/realm/realm-kotlin/issues/1577)) +* Using `Realm.asFlow()` could miss an update if a write was started right after opening the Realm. (Issue [#1582](https://github.com/realm/realm-kotlin/issues/1582)) ### Compatibility * File format: Generates Realms with file format v23. diff --git a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/RealmImpl.kt b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/RealmImpl.kt index e99972f1ce..463d604eea 100644 --- a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/RealmImpl.kt +++ b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/RealmImpl.kt @@ -46,7 +46,9 @@ import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.withIndex import kotlinx.coroutines.launch import kotlin.reflect.KClass @@ -65,7 +67,7 @@ public class RealmImpl private constructor( internal val realmScope = CoroutineScope(SupervisorJob() + notificationScheduler.dispatcher) private val notifierFlow: MutableSharedFlow> = - MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) + MutableSharedFlow(onBufferOverflow = BufferOverflow.DROP_OLDEST, replay = 1) private val notifier = SuspendableNotifier( owner = this, @@ -208,7 +210,13 @@ public class RealmImpl private constructor( } override fun asFlow(): Flow> = scopedFlow { - notifierFlow.onStart { emit(InitialRealmImpl(this@RealmImpl)) } + notifierFlow.withIndex() + .map { (index, change) -> + when (index) { + 0 -> InitialRealmImpl(this) + else -> change + } + } } override fun writeCopyTo(configuration: Configuration) { diff --git a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/SuspendableNotifier.kt b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/SuspendableNotifier.kt index e075e86dae..ef64cc8da2 100644 --- a/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/SuspendableNotifier.kt +++ b/packages/library-base/src/commonMain/kotlin/io/realm/kotlin/internal/SuspendableNotifier.kt @@ -44,7 +44,7 @@ internal class SuspendableNotifier( // see https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt#L78 private val _realmChanged = MutableSharedFlow( onBufferOverflow = BufferOverflow.DROP_OLDEST, - extraBufferCapacity = 1 + replay = 1 ) val dispatcher: CoroutineDispatcher = scheduler.dispatcher @@ -89,7 +89,10 @@ internal class SuspendableNotifier( // Touching realm will open the underlying realm and register change listeners, but must // happen on the dispatcher as the realm can only be touched on the dispatcher's thread. if (!realmInitializer.isInitialized()) { - withContext(dispatcher) { realm } + withContext(dispatcher) { + realm + _realmChanged.emit(realm.version()) + } } return _realmChanged.asSharedFlow() } diff --git a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmDictionaryTests.kt b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmDictionaryTests.kt index 4c903effe0..3ccc1f648e 100644 --- a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmDictionaryTests.kt +++ b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmDictionaryTests.kt @@ -1261,7 +1261,7 @@ class RealmDictionaryTests : EmbeddedObjectCollectionQueryTests { val listener = async { withTimeout(10.seconds) { flow.collect { current -> - delay(30.milliseconds) + delay(100.milliseconds) } } } diff --git a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmListTests.kt b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmListTests.kt index 4e1bcf0b8f..a25a0fd109 100644 --- a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmListTests.kt +++ b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmListTests.kt @@ -570,7 +570,7 @@ class RealmListTests : EmbeddedObjectCollectionQueryTests { val listener = async { withTimeout(10.seconds) { flow.collect { current -> - delay(30.milliseconds) + delay(100.milliseconds) } } } diff --git a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmSetTests.kt b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmSetTests.kt index 437ff45ebc..f03eede904 100644 --- a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmSetTests.kt +++ b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/RealmSetTests.kt @@ -575,7 +575,7 @@ class RealmSetTests : CollectionQueryTests { val listener = async { withTimeout(10.seconds) { flow.collect { current -> - delay(30.milliseconds) + delay(100.milliseconds) } } } diff --git a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/VersionTrackingTests.kt b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/VersionTrackingTests.kt index d79035d188..1dc47b670d 100644 --- a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/VersionTrackingTests.kt +++ b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/VersionTrackingTests.kt @@ -103,7 +103,7 @@ class VersionTrackingTests { // Write that doesn't return objects does not trigger tracking additional versions realm.write { copyToRealm(Sample()) } realm.activeVersions().run { - assertEquals(1, allTracked.size, toString()) + assertTrue(1 >= allTracked.size, toString()) assertNotNull(writer, toString()) assertEquals(0, writer?.active?.size, toString()) } @@ -111,7 +111,7 @@ class VersionTrackingTests { // Until we actually query the object realm.query().find() realm.activeVersions().run { - assertEquals(2, allTracked.size, toString()) + assertTrue(2 >= allTracked.size, toString()) assertNotNull(writer, toString()) assertEquals(1, writer?.active?.size, toString()) } @@ -129,7 +129,7 @@ class VersionTrackingTests { // not assigned to a variable unless the generic return type is ) realm.write { copyToRealm(Sample()) } realm.activeVersions().run { - assertEquals(2, allTracked.size, toString()) + assertTrue(2 >= allTracked.size, toString()) assertNotNull(writer, toString()) assertEquals(1, writer?.active?.size, toString()) } @@ -219,8 +219,8 @@ class VersionTrackingTests { @Suppress("invisible_member", "invisible_reference") fun initialVersionDereferencedAfterFirstWrite() { (realm as RealmImpl).let { realm -> - assertNotNull(realm.initialRealmReference.value, toString()) - assertEquals(1, realm.versionTracker.versions().size, toString()) + val intermediateVersions = realm.versionTracker.versions() + assertEquals(1, intermediateVersions.size, intermediateVersions.toString()) val realmUpdates = TestChannel() @@ -238,9 +238,11 @@ class VersionTrackingTests { // Wait for the notifier to start realmUpdates.receiveOrFail() - assertNull(realm.initialRealmReference.value, toString()) - assertEquals(1, realm.versionTracker.versions().size, toString()) + // Depending on the exact timing, the first version might or might not have been + // GC'ed. If GC'ed, there are no intermediate versions. + val trackedVersions = realm.versionTracker.versions() + assertTrue(1 >= trackedVersions.size, trackedVersions.toString()) deferred.cancel() realmUpdates.close() diff --git a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/notifications/RealmNotificationsTests.kt b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/notifications/RealmNotificationsTests.kt index 2fec01968d..71ccea9759 100644 --- a/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/notifications/RealmNotificationsTests.kt +++ b/packages/test-base/src/commonTest/kotlin/io/realm/kotlin/test/common/notifications/RealmNotificationsTests.kt @@ -90,6 +90,45 @@ class RealmNotificationsTests : FlowableTests { } } + @Test + fun registerTwoFlows() = runBlocking { + val c1 = TestChannel>() + val c2 = TestChannel>() + val startingVersion = realm.version() + val observer1 = async { + realm.asFlow().collect { + c1.send(it) + } + } + c1.receiveOrFail(message = "Failed to receive initial event on Channel 1").let { realmChange -> + assertIs>(realmChange) + assertEquals(startingVersion, realmChange.realm.version()) + } + + realm.write { /* Do nothing */ } + val nextVersion = realm.version() + + val observer2 = async { + realm.asFlow().collect { + c2.send(it) + } + } + + c1.receiveOrFail(message = "Failed to receive update event on Channel 1").let { realmChange -> + assertIs>(realmChange) + assertEquals(nextVersion, realmChange.realm.version()) + } + c2.receiveOrFail(message = "Failed to receive initial event on Channel 2").let { realmChange -> + assertIs>(realmChange) + assertEquals(nextVersion, realmChange.realm.version()) + } + + observer1.cancel() + observer2.cancel() + c1.cancel() + c2.cancel() + } + @Test override fun asFlow() { runBlocking {