From e8949fe84be61572ce745942c7affee00726ff51 Mon Sep 17 00:00:00 2001 From: clementetb Date: Fri, 24 May 2024 09:38:23 +0200 Subject: [PATCH] [RKOTLIN-1079] Add support for transfer progress estimate (#1575) --- CHANGELOG.md | 4 +- .../realm/kotlin/internal/interop/Callback.kt | 2 +- .../kotlin/internal/interop/RealmInterop.kt | 6 +- .../src/main/jni/realm_api_helpers.cpp | 7 +- .../src/main/jni/realm_api_helpers.h | 2 +- .../mongodb/internal/SyncSessionImpl.kt | 8 +- .../io/realm/kotlin/mongodb/sync/Progress.kt | 11 +- .../realm/kotlin/mongodb/sync/SyncSession.kt | 2 - .../common/FLXProgressListenerTests.kt | 303 ++++++++++++++++++ ...erTests.kt => PBSProgressListenerTests.kt} | 194 +++++------ 10 files changed, 416 insertions(+), 123 deletions(-) create mode 100644 packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt rename packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/{ProgressListenerTests.kt => PBSProgressListenerTests.kt} (63%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 456b9b30d8..a250b2a84f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,13 +11,15 @@ This release will bump the Realm file format 24. Opening a file with an older fo * Some authentication related operations will no longer throw specialized `InvalidCredentialsException` and `CredentialsCannotBeLinkedException` but the more general `AuthException` and `ServiceException`. (Issue [#1763](https://github.com/realm/realm-kotlin/issues/1763)/[RKOTLIN-1091](https://jira.mongodb.org/browse/RKOTLIN-1091)) * [Sync] Removed deprecated methods `User.identity` and `User.provider`, user identities can be accessed with the already existing `User.identities`. (Issue [#1751](https://github.com/realm/realm-kotlin/issues/1751) [JIRA](https://jira.mongodb.org/browse/RKOTLIN-1083)) * [Sync] `App.allUsers` does no longer return a map, but only a list of users known locally. (Issue [#1751](https://github.com/realm/realm-kotlin/issues/1751) [JIRA](https://jira.mongodb.org/browse/RKOTLIN-1083)) -* [Sync ]Removed deprecated `DiscardUnsyncedChangesStrategy.onError`. (Issue [#1755](https://github.com/realm/realm-kotlin/issues/1755) [JIRA](https://jira.mongodb.org/browse/RKOTLIN-1085)) +* [Sync] Removed deprecated `DiscardUnsyncedChangesStrategy.onError`. (Issue [#1755](https://github.com/realm/realm-kotlin/issues/1755) [JIRA](https://jira.mongodb.org/browse/RKOTLIN-1085)) +* [Sync] Sync progress notifications now reports an estimate ranged from `0.0` to `1.0` with `Progress.estimate` instead of `transferredBytes` and `totalBytes`. (Issue [#1744](https://github.com/realm/realm-kotlin/issues/1744) [RKOTLIN-1079](https://jira.mongodb.org/browse/RKOTLIN-1079)). ### Enhancements * Support for RealmLists and RealmDictionaries in `RealmAny`. (Issue [#1434](https://github.com/realm/realm-kotlin/issues/1434)) * Optimized `RealmList.indexOf()` and `RealmList.contains()` using Core implementation of operations instead of iterating elements and comparing them in Kotlin. (Issue [#1625](https://github.com/realm/realm-kotlin/pull/1666) [RKOTLIN-995](https://jira.mongodb.org/browse/RKOTLIN-995)). * Add support for filtering logs by category. (Issue [#1691](https://github.com/realm/realm-kotlin/issues/1691) [JIRA](https://jira.mongodb.org/browse/RKOTLIN-1038)) * [Sync] Add Mongo Client API to access Atlas App Service collections. It can be accessed through `User.mongoClient`. (Issue [#972](https://github.com/realm/realm-kotlin/issues/972)/[RKOTLIN-612](https://jira.mongodb.org/browse/RKOTLIN-612)) +* [Sync] Sync progress notifications is now also supported for flexible sync configurations. (Issue [#1744](https://github.com/realm/realm-kotlin/issues/1744) [RKOTLIN-1079](https://jira.mongodb.org/browse/RKOTLIN-1079)). ### Fixed * Inserting the same typed link to the same key in a dictionary more than once would incorrectly create multiple backlinks to the object. This did not appear to cause any crashes later, but would have affecting explicit backlink count queries (eg: `...@links.@count`) and possibly notifications (Core Issue [realm/realm-core#7676](https://github.com/realm/realm-core/issues/7676) since v1.16.0). diff --git a/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/Callback.kt b/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/Callback.kt index c816735cad..211537c65e 100644 --- a/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/Callback.kt +++ b/packages/cinterop/src/commonMain/kotlin/io/realm/kotlin/internal/interop/Callback.kt @@ -85,7 +85,7 @@ fun interface AsyncOpenCallback { } fun interface ProgressCallback { - fun onChange(transferredBytes: Long, totalBytes: Long) + fun onChange(progressEstimate: Double) } fun interface ConnectionStateChangeCallback { diff --git a/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt b/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt index 9409b13044..6f32c608f2 100644 --- a/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt +++ b/packages/cinterop/src/nativeDarwin/kotlin/io/realm/kotlin/internal/interop/RealmInterop.kt @@ -270,6 +270,7 @@ fun String.toRString(memScope: MemScope) = cValue { set(memScope, this@toRString) } +@OptIn(ExperimentalForeignApi::class) @Suppress("LargeClass", "FunctionNaming") actual object RealmInterop { @@ -2813,10 +2814,9 @@ actual object RealmInterop { return CPointerWrapper( realm_wrapper.realm_sync_session_register_progress_notifier( syncSession.cptr(), - staticCFunction { userData, transferred_bytes, total_bytes, _ -> + staticCFunction { userData, _, _, progress_estimate -> safeUserData(userData).run { - // TODO Progress ignored until https://github.com/realm/realm-kotlin/pull/1575 - onChange(transferred_bytes.toLong(), total_bytes.toLong()) + onChange(progress_estimate) } }, direction.nativeValue, diff --git a/packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp b/packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp index b3b0f6dd49..c62c0757b2 100644 --- a/packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp +++ b/packages/jni-swig-stub/src/main/jni/realm_api_helpers.cpp @@ -1244,14 +1244,13 @@ sync_after_client_reset_handler(realm_sync_config_t* config, jobject after_handl } void -realm_sync_session_progress_notifier_callback(void *userdata, uint64_t transferred_bytes, uint64_t total_bytes, double progress) { +realm_sync_session_progress_notifier_callback(void *userdata, uint64_t, uint64_t, double progress_estimate) { auto env = get_env(true); - // TODO Progress ignored until https://github.com/realm/realm-kotlin/pull/1575 - static JavaMethod java_callback_method(env, JavaClassGlobalDef::progress_callback(), "onChange", "(JJ)V"); + static JavaMethod java_callback_method(env, JavaClassGlobalDef::progress_callback(), "onChange", "(D)V"); jni_check_exception(env); - env->CallVoidMethod(static_cast(userdata), java_callback_method, jlong(transferred_bytes), jlong(total_bytes)); + env->CallVoidMethod(static_cast(userdata), java_callback_method, jdouble(progress_estimate)); jni_check_exception(env); } diff --git a/packages/jni-swig-stub/src/main/jni/realm_api_helpers.h b/packages/jni-swig-stub/src/main/jni/realm_api_helpers.h index c1ea28ed47..a6bf096d4e 100644 --- a/packages/jni-swig-stub/src/main/jni/realm_api_helpers.h +++ b/packages/jni-swig-stub/src/main/jni/realm_api_helpers.h @@ -107,7 +107,7 @@ void sync_after_client_reset_handler(realm_sync_config_t* config, jobject after_handler); void -realm_sync_session_progress_notifier_callback(void *userdata, uint64_t transferred_bytes, uint64_t total_bytes, double progress); +realm_sync_session_progress_notifier_callback(void *userdata, uint64_t, uint64_t, double progress_estimate); void realm_sync_session_connection_state_change_callback(void *userdata, realm_sync_connection_state_e old_state, realm_sync_connection_state_e new_state); diff --git a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt index 7a9d3416f6..4d6395dd1d 100644 --- a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt +++ b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/internal/SyncSessionImpl.kt @@ -16,7 +16,6 @@ package io.realm.kotlin.mongodb.internal -import io.realm.kotlin.internal.InternalConfiguration import io.realm.kotlin.internal.NotificationToken import io.realm.kotlin.internal.RealmImpl import io.realm.kotlin.internal.interop.CoreError @@ -109,9 +108,6 @@ internal open class SyncSessionImpl( direction: Direction, progressMode: ProgressMode, ): Flow { - if ((configuration as InternalConfiguration).isFlexibleSyncConfiguration) { - throw UnsupportedOperationException("Progress listeners are not supported for Flexible Sync.") - } return realm.scopedFlow { callbackFlow { val token: AtomicRef = @@ -124,8 +120,8 @@ internal open class SyncSessionImpl( Direction.UPLOAD -> ProgressDirection.RLM_SYNC_PROGRESS_DIRECTION_UPLOAD }, progressMode == ProgressMode.INDEFINITELY - ) { transferredBytes: Long, totalBytes: Long -> - val progress = Progress(transferredBytes.toULong(), totalBytes.toULong()) + ) { progressEstimate: Double -> + val progress = Progress(progressEstimate) trySendWithBufferOverflowCheck(progress) if (progressMode == ProgressMode.CURRENT_CHANGES && progress.isTransferComplete) { close() diff --git a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/Progress.kt b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/Progress.kt index fb1ee49d43..7894b2de76 100644 --- a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/Progress.kt +++ b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/Progress.kt @@ -21,14 +21,9 @@ package io.realm.kotlin.mongodb.sync */ public data class Progress( /** - * Total number of bytes that has been transferred by the [SyncSession]. + * Transfer progress estimation ranged from 0.0 to 1.0. */ - val transferredBytes: ULong, - /** - * Total number of transferable bytes (bytes that have been transferred + pending bytes not - * yet transferred). - */ - val transferableBytes: ULong + val estimate: Double, ) { /** * Property indicating if all pending bytes have been transferred. @@ -40,5 +35,5 @@ public data class Progress( * flow can continue to emit events with `isTransferComplete = false` for subsequent events * after returning a progress indicator with `isTransferComplete = true`. */ - public val isTransferComplete: Boolean = transferredBytes >= transferableBytes + public val isTransferComplete: Boolean = estimate >= 1.0 } diff --git a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/SyncSession.kt b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/SyncSession.kt index c5998d83d0..fa612eb833 100644 --- a/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/SyncSession.kt +++ b/packages/library-sync/src/commonMain/kotlin/io/realm/kotlin/mongodb/sync/SyncSession.kt @@ -136,8 +136,6 @@ public interface SyncSession { * * The flow has an internal buffer of [Channel.BUFFERED] but if the consumer fails to consume the * elements in a timely manner the flow will be completed with an [IllegalStateException]. - * - * @throws UnsupportedOperationException if invoked on a realm with Flexible Sync enabled. */ public fun progressAsFlow( direction: Direction, diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt new file mode 100644 index 0000000000..9fe4307aa6 --- /dev/null +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/FLXProgressListenerTests.kt @@ -0,0 +1,303 @@ +/* + * Copyright 2024 Realm Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.realm.kotlin.test.mongodb.common + +import io.realm.kotlin.Realm +import io.realm.kotlin.entities.sync.SyncObjectWithAllTypes +import io.realm.kotlin.ext.query +import io.realm.kotlin.internal.platform.runBlocking +import io.realm.kotlin.mongodb.User +import io.realm.kotlin.mongodb.sync.Direction +import io.realm.kotlin.mongodb.sync.Progress +import io.realm.kotlin.mongodb.sync.ProgressMode +import io.realm.kotlin.mongodb.sync.SyncConfiguration +import io.realm.kotlin.mongodb.syncSession +import io.realm.kotlin.test.mongodb.TEST_APP_FLEX +import io.realm.kotlin.test.mongodb.TestApp +import io.realm.kotlin.test.mongodb.createUserAndLogIn +import io.realm.kotlin.test.mongodb.use +import io.realm.kotlin.test.util.TestChannel +import io.realm.kotlin.test.util.receiveOrFail +import io.realm.kotlin.test.util.use +import kotlinx.coroutines.async +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.last +import kotlinx.coroutines.flow.scan +import kotlinx.coroutines.flow.takeWhile +import kotlinx.coroutines.supervisorScope +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.withTimeout +import kotlin.random.Random +import kotlin.test.AfterTest +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertTrue +import kotlin.test.fail +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds + +class FLXProgressListenerTests { + + private val TEST_SIZE = 10 + private val TIMEOUT = 30.seconds + + private lateinit var app: TestApp + private lateinit var partitionValue: String + + @BeforeTest + fun setup() { + app = TestApp(this::class.simpleName, appName = TEST_APP_FLEX) + partitionValue = org.mongodb.kbson.ObjectId().toString() + } + + @AfterTest + fun tearDown() { + if (this::app.isInitialized) { + app.close() + } + } + + @Test + fun downloadProgressListener_changesOnly() = runBlocking { + Realm.open(createSyncConfig(app.createUserAndLogIn())).use { uploadRealm -> + // Verify that we: + // - get a "transferComplete" event + // - complete the flow, and + // - that all objects are available afterwards + Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> + // Ensure that we can do consecutive CURRENT_CHANGES registrations + for (i in 0 until 3) { + val transferCompleteJob = async { + // Postpone the progress listener flow so that it is started after the + // following downloadAllServerChanges. This should ensure that we are + // actually downloading stuff. + realm.syncSession.progressAsFlow( + Direction.DOWNLOAD, + ProgressMode.CURRENT_CHANGES + ).run { + withTimeout(TIMEOUT) { + last().let { progress: Progress -> + assertTrue(progress.isTransferComplete) + assertEquals(1.0, progress.estimate) + } + } + } + } + uploadRealm.writeSampleData( + TEST_SIZE, + timeout = TIMEOUT + ) + transferCompleteJob.await() + + // Progress.isTransferComplete does not guarantee that changes are integrated and + // visible in the realm + realm.syncSession.downloadAllServerChanges(TIMEOUT) + assertEquals( + TEST_SIZE * (i + 1), + realm.query().find().size + ) + } + } + } + } + + @Test + fun downloadProgressListener_indefinitely() = runBlocking { + Realm.open(createSyncConfig(app.createUserAndLogIn())).use { uploadRealm -> + uploadRealm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + + Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> + val flow = realm.syncSession + .progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) + .completionCounter() + + withTimeout(TIMEOUT) { + flow.takeWhile { completed -> completed < 3 } + .collect { _ -> + uploadRealm.writeSampleData( + TEST_SIZE, + timeout = TIMEOUT + ) + } + } + } + } + } + + @Test + fun uploadProgressListener_changesOnly() = runBlocking { + Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> + for (i in 0..3) { + realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.CURRENT_CHANGES) + .run { + withTimeout(TIMEOUT) { + last().let { + assertTrue(it.isTransferComplete) + assertEquals(1.0, it.estimate) + } + } + } + } + } + } + + @Test + fun uploadProgressListener_indefinitely() = runBlocking { + Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> + val flow = realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) + .completionCounter() + + withTimeout(TIMEOUT) { + flow.takeWhile { completed -> completed < 3 } + .collect { _ -> + realm.writeSampleData(TEST_SIZE) + } + } + } + } + + @Test + fun worksAfterExceptions() = runBlocking { + Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> + realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + } + + Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> + val flow = realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) + assertFailsWith { + flow.collect { + @Suppress("TooGenericExceptionThrown") + throw RuntimeException("Crashing progress flow") + } + } + + withTimeout(TIMEOUT) { + flow.first { it.isTransferComplete } + } + } + } + + @Test + fun worksAfterCancel() = runBlocking { + Realm.open(createSyncConfig(app.createUserAndLogIn())).use { writerRealm -> + Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> + // Setup a flow that we are just going to cancel + val flow = realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) + + supervisorScope { + val mutex = Mutex(true) + val task = async { + flow.collect { + mutex.unlock() + } + } + // Await the flow actually being active, this requires actual data transfer as + // we arent guaranteed any initial events. + writerRealm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + mutex.lock() + task.cancel() + } + + // Verify that progress listeners still work + withTimeout(TIMEOUT) { + val task = async { flow.first { it.isTransferComplete } } + // Trigger data transfer to ensure we get an event at some point + writerRealm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + task.await() + } + } + } + } + + @Test + fun completesOnClose() = runBlocking { + val channel = TestChannel(capacity = 5, onBufferOverflow = BufferOverflow.DROP_OLDEST, failIfBufferIsEmptyOnCancel = false) + TestApp("completesOnClose", TEST_APP_FLEX).use { app -> + val user = app.createUserAndLogIn() + val realm = Realm.open(createSyncConfig(user)) + try { + val flow = realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) + val job = async { + withTimeout(10.seconds) { + flow.collect { + channel.trySend(true) + } + } + } + realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + // Wait for Flow to start, so we do not close the Realm before + // `flow.collect()` can be called. + channel.receiveOrFail() + realm.close() + job.await() + } finally { + channel.close() + if (!realm.isClosed()) { + realm.close() + } + } + } + } + + private suspend fun Realm.writeSampleData(count: Int, timeout: Duration? = null) { + repeat(count) { + write { + copyToRealm( + SyncObjectWithAllTypes().apply { + stringField = getTestPartitionValue() + binaryField = Random.nextBytes(100) + } + ) + } + } + timeout?.let { + assertTrue { syncSession.uploadAllLocalChanges(timeout) } + } + } + + // Operator that will return a flow that emits an increasing integer on each completion event + private fun Flow.completionCounter(): Flow = + filter { it.isTransferComplete } + .buffer(5, onBufferOverflow = BufferOverflow.DROP_OLDEST) + .scan(0) { accumulator, _ -> + accumulator + 1 + } + + private fun createSyncConfig( + user: User, + ): SyncConfiguration { + return SyncConfiguration.Builder(user, FLEXIBLE_SYNC_SCHEMA) + .initialSubscriptions { + add(it.query("stringField = $0", getTestPartitionValue())) + } + .build() + } + + private fun getTestPartitionValue(): String { + if (!this::partitionValue.isInitialized) { + fail("Test not setup correctly. Partition value is missing") + } + return partitionValue + } +} diff --git a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/ProgressListenerTests.kt b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt similarity index 63% rename from packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/ProgressListenerTests.kt rename to packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt index a57e0bdbf5..53b93ea5c2 100644 --- a/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/ProgressListenerTests.kt +++ b/packages/test-sync/src/commonTest/kotlin/io/realm/kotlin/test/mongodb/common/PBSProgressListenerTests.kt @@ -20,16 +20,17 @@ import io.realm.kotlin.Realm import io.realm.kotlin.entities.sync.SyncObjectWithAllTypes import io.realm.kotlin.ext.query import io.realm.kotlin.internal.platform.runBlocking +import io.realm.kotlin.log.LogLevel +import io.realm.kotlin.log.RealmLog import io.realm.kotlin.mongodb.User import io.realm.kotlin.mongodb.sync.Direction import io.realm.kotlin.mongodb.sync.Progress import io.realm.kotlin.mongodb.sync.ProgressMode import io.realm.kotlin.mongodb.sync.SyncConfiguration +import io.realm.kotlin.mongodb.sync.SyncSession import io.realm.kotlin.mongodb.syncSession -import io.realm.kotlin.test.mongodb.TEST_APP_FLEX import io.realm.kotlin.test.mongodb.TEST_APP_PARTITION import io.realm.kotlin.test.mongodb.TestApp -import io.realm.kotlin.test.mongodb.common.utils.assertFailsWithMessage import io.realm.kotlin.test.mongodb.common.utils.uploadAllLocalChangesOrFail import io.realm.kotlin.test.mongodb.createUserAndLogIn import io.realm.kotlin.test.mongodb.use @@ -38,23 +39,18 @@ import io.realm.kotlin.test.util.receiveOrFail import io.realm.kotlin.test.util.use import kotlinx.coroutines.async import kotlinx.coroutines.channels.BufferOverflow -import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.distinctUntilChanged -import kotlinx.coroutines.flow.drop import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.last -import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.scan import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.supervisorScope import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.withTimeout -import org.mongodb.kbson.ObjectId +import kotlin.random.Random import kotlin.test.AfterTest import kotlin.test.BeforeTest -import kotlin.test.Ignore import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith @@ -63,18 +59,18 @@ import kotlin.test.fail import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds -private const val TEST_SIZE = 500 -private val TIMEOUT = 30.seconds - -class ProgressListenerTests { +class PBSProgressListenerTests { + private val TEST_SIZE = 10 + private val TIMEOUT = 30.seconds private lateinit var app: TestApp private lateinit var partitionValue: String @BeforeTest fun setup() { + RealmLog.setLevel(LogLevel.INFO) app = TestApp(this::class.simpleName, appName = TEST_APP_PARTITION) - partitionValue = ObjectId().toString() + partitionValue = org.mongodb.kbson.ObjectId().toString() } @AfterTest @@ -85,7 +81,6 @@ class ProgressListenerTests { } @Test - @Ignore // https://github.com/realm/realm-core/issues/7627 fun downloadProgressListener_changesOnly() = runBlocking { Realm.open(createSyncConfig(app.createUserAndLogIn())).use { uploadRealm -> // Verify that we: @@ -94,27 +89,31 @@ class ProgressListenerTests { // - that all objects are available afterwards Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> // Ensure that we can do consecutive CURRENT_CHANGES registrations - for (i in 0 until 3) { - uploadRealm.writeSampleData( - TEST_SIZE, - idOffset = TEST_SIZE * i, - timeout = TIMEOUT - ) - - // We are not sure when the realm actually knows of the remote changes and consider - // them current, so wait a bit - delay(10.seconds) - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.CURRENT_CHANGES) - .run { + repeat(3) { iteration -> + val transferCompleteJob = async { + realm.syncSession.progressAsFlow( + Direction.DOWNLOAD, + ProgressMode.CURRENT_CHANGES + ).run { withTimeout(TIMEOUT) { - assertTrue(last().isTransferComplete) + last().let { progress: Progress -> + assertTrue(progress.isTransferComplete) + assertEquals(1.0, progress.estimate) + } } } + } + realm.syncSession.runWhilePaused { + uploadRealm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + } + + transferCompleteJob.await() + // Progress.isTransferComplete does not guarantee that changes are integrated and // visible in the realm realm.syncSession.downloadAllServerChanges(TIMEOUT) assertEquals( - TEST_SIZE * (i + 1), + TEST_SIZE * (iteration + 1), realm.query().find().size ) } @@ -125,19 +124,20 @@ class ProgressListenerTests { @Test fun downloadProgressListener_indefinitely() = runBlocking { Realm.open(createSyncConfig(app.createUserAndLogIn())).use { uploadRealm -> - uploadRealm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) - Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> - val flow = realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) + val flow = realm.syncSession + .progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) .completionCounter() + withTimeout(TIMEOUT) { flow.takeWhile { completed -> completed < 3 } .collect { completed -> - uploadRealm.writeSampleData( - TEST_SIZE, - idOffset = (completed + 1) * TEST_SIZE, - timeout = TIMEOUT - ) + realm.syncSession.runWhilePaused { + uploadRealm.writeSampleData( + TEST_SIZE, + timeout = TIMEOUT + ) + } } } } @@ -147,13 +147,17 @@ class ProgressListenerTests { @Test fun uploadProgressListener_changesOnly() = runBlocking { Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> - for (i in 0..3) { - realm.writeSampleData(TEST_SIZE, idOffset = TEST_SIZE * i, timeout = TIMEOUT) - realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.CURRENT_CHANGES).run { - withTimeout(TIMEOUT) { - assertTrue(last().isTransferComplete) + repeat(3) { + realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) + realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.CURRENT_CHANGES) + .run { + withTimeout(TIMEOUT) { + last().let { + assertTrue(it.isTransferComplete) + assertEquals(1.0, it.estimate) + } + } } - } } } } @@ -161,13 +165,16 @@ class ProgressListenerTests { @Test fun uploadProgressListener_indefinitely() = runBlocking { Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> - val flow = realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) + val flow = realm.syncSession + .progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) .completionCounter() withTimeout(TIMEOUT) { flow.takeWhile { completed -> completed < 3 } - .collect { completed -> - realm.writeSampleData(TEST_SIZE, idOffset = (completed + 1) * TEST_SIZE) + .collect { _ -> + realm.syncSession.runWhilePaused { + realm.writeSampleData(TEST_SIZE) + } realm.syncSession.uploadAllLocalChangesOrFail() } } @@ -175,22 +182,22 @@ class ProgressListenerTests { } @Test - @Ignore // https://github.com/realm/realm-core/issues/7627 fun worksAfterExceptions() = runBlocking { Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) } Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> + val flow = realm.syncSession + .progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) + assertFailsWith { - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) - .collect { - @Suppress("TooGenericExceptionThrown") - throw RuntimeException("Crashing progress flow") - } + flow.collect { + @Suppress("TooGenericExceptionThrown") + throw RuntimeException("Crashing progress flow") + } } - val flow = realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) withTimeout(TIMEOUT) { flow.first { it.isTransferComplete } } @@ -198,7 +205,6 @@ class ProgressListenerTests { } @Test - @Ignore // https://github.com/realm/realm-core/issues/7627 fun worksAfterCancel() = runBlocking { Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> realm.writeSampleData(TEST_SIZE, timeout = TIMEOUT) @@ -206,7 +212,10 @@ class ProgressListenerTests { Realm.open(createSyncConfig(app.createUserAndLogin())).use { realm -> // Setup a flow that we are just going to cancel - val flow = realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) + val flow = + realm.syncSession + .progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) + supervisorScope { val mutex = Mutex(true) val task = async { @@ -220,16 +229,13 @@ class ProgressListenerTests { } // Verify that progress listeners still work - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.CURRENT_CHANGES).run { - withTimeout(TIMEOUT) { - flow.first { it.isTransferComplete } - } + withTimeout(TIMEOUT) { + flow.first { it.isTransferComplete } } } } @Test - @Ignore // https://github.com/realm/realm-core/issues/7627 fun triggerImmediatelyWhenRegistered() = runBlocking { Realm.open(createSyncConfig(app.createUserAndLogIn())).use { realm -> withTimeout(10.seconds) { @@ -238,37 +244,28 @@ class ProgressListenerTests { assertTrue { realm.syncSession.downloadAllServerChanges() } // Ensure that progress listeners are triggered at least one time even though there // is no data - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.CURRENT_CHANGES).first() - realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.CURRENT_CHANGES).first() - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY).first() - realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY).first() - } - } - } - - @Test - fun throwsOnFlexibleSync() = runBlocking { - TestApp("throwsOnFlexibleSync", TEST_APP_FLEX).use { - val user = app.createUserAndLogIn() - val configuration: SyncConfiguration = SyncConfiguration.create(user, FLEXIBLE_SYNC_SCHEMA) - Realm.open(configuration).use { realm -> - assertFailsWithMessage( - "Progress listeners are not supported for Flexible Sync" - ) { - realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.CURRENT_CHANGES) - } + realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.CURRENT_CHANGES) + .first() + realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.CURRENT_CHANGES) + .first() + realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) + .first() + realm.syncSession.progressAsFlow(Direction.UPLOAD, ProgressMode.INDEFINITELY) + .first() } } } @Test fun completesOnClose() = runBlocking { - val channel = TestChannel(capacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) + val channel = + TestChannel(capacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) TestApp("completesOnClose", TEST_APP_PARTITION).use { app -> val user = app.createUserAndLogIn() val realm = Realm.open(createSyncConfig(user)) try { - val flow = realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) + val flow = + realm.syncSession.progressAsFlow(Direction.DOWNLOAD, ProgressMode.INDEFINITELY) val job = async { withTimeout(10.seconds) { flow.collect { @@ -290,36 +287,33 @@ class ProgressListenerTests { } } - private suspend fun Realm.writeSampleData(count: Int, idOffset: Int = 0, timeout: Duration? = null) { - write { - for (i in idOffset until count + idOffset) { - copyToRealm(SyncObjectWithAllTypes().apply { stringField = "Object $i" }) + private suspend fun Realm.writeSampleData(count: Int, timeout: Duration? = null) { + repeat(count) { + write { + copyToRealm( + SyncObjectWithAllTypes() + .apply { + binaryField = Random.nextBytes(100) + } + ) } } + timeout?.let { assertTrue { syncSession.uploadAllLocalChanges(timeout) } } } - // Operator that will return a flow that emits an incresing integer on each completion event + // Operator that will return a flow that emits an increasing integer on each completion event private fun Flow.completionCounter(): Flow = filter { it.isTransferComplete } - .distinctUntilChanged() - // Increment completed count if we are done transferring and the amount of bytes has - // increased - .scan(0UL to 0) { (bytes, completed), progress -> - if (progress.isTransferComplete && progress.transferableBytes > bytes) { - (progress.transferredBytes to completed + 1) - } else { - (bytes to completed) - } + .scan(0) { accumulator, _ -> + accumulator + 1 } - .drop(1) - .map { (_, completed) -> completed } private fun createSyncConfig( user: User, - partitionValue: String = getTestPartitionValue() + partitionValue: String = getTestPartitionValue(), ): SyncConfiguration { return SyncConfiguration.Builder(user, partitionValue, PARTITION_BASED_SCHEMA) .build() @@ -331,4 +325,10 @@ class ProgressListenerTests { } return partitionValue } + + private suspend fun SyncSession.runWhilePaused(block: suspend () -> Unit) { + pause() + block() + resume() + } }