Skip to content

Commit

Permalink
Add sync migrations with basic test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
clementetb committed Jun 28, 2024
1 parent f83118f commit fa96c03
Show file tree
Hide file tree
Showing 14 changed files with 483 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ object CoreErrorConverter {
return userError ?: when {
ErrorCode.RLM_ERR_INDEX_OUT_OF_BOUNDS == errorCode ->
IndexOutOfBoundsException(message)
ErrorCode.RLM_ERR_INVALID_SCHEMA_CHANGE == errorCode ||
ErrorCode.RLM_ERR_INVALID_SCHEMA_VERSION == errorCode ->
InvalidSchemaException(message)
ErrorCategory.RLM_ERR_CAT_INVALID_ARG in categories && ErrorCategory.RLM_ERR_CAT_SYNC_ERROR !in categories -> {
// Some sync errors flagged as both logical and illegal. In our case, we consider those
// IllegalState, so discard them them here and let them fall through to the bottom case
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright 2024 Realm Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.realm.kotlin.internal.interop

class InvalidSchemaException(override val message: String?) : IllegalStateException()
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ expect object RealmInterop {
// dispatcher. The realm itself must also be opened on the same thread
fun realm_open(config: RealmConfigurationPointer, scheduler: RealmSchedulerPointer): Pair<LiveRealmPointer, Boolean>

fun realm_open(config: RealmConfigurationPointer): LiveRealmPointer

// Opening a Realm asynchronously. Only supported for synchronized realms.
fun realm_open_synchronized(config: RealmConfigurationPointer): RealmAsyncOpenTaskPointer
fun realm_async_open_task_start(task: RealmAsyncOpenTaskPointer, callback: AsyncOpenCallback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,13 @@ actual object RealmInterop {
return Pair(realmPtr, fileCreated)
}

actual fun realm_open(
config: RealmConfigurationPointer,
): LiveRealmPointer {
val realmPtr = LongPointerWrapper<LiveRealmT>(realmc.realm_open(config.cptr()))
return realmPtr
}

actual fun realm_open_synchronized(config: RealmConfigurationPointer): RealmAsyncOpenTaskPointer {
return LongPointerWrapper(realmc.realm_open_synchronized(config.cptr()))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ actual enum class ErrorCode(

actual companion object {
actual fun of(nativeValue: Int): ErrorCode? =
values().firstOrNull { value ->
entries.firstOrNull { value ->
value.nativeValue == nativeValue
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,11 @@ actual object RealmInterop {
return Pair(realmPtr, fileCreated.value)
}

actual fun realm_open(config: RealmConfigurationPointer): LiveRealmPointer {
val realmPtr = CPointerWrapper<LiveRealmT>(realm_wrapper.realm_open(config.cptr()))
return realmPtr
}

actual fun realm_create_scheduler(): RealmSchedulerPointer {
// If there is no notification dispatcher use the default scheduler.
// Re-verify if this is actually needed when notification scheduler is fully in place.
Expand Down
2 changes: 1 addition & 1 deletion packages/external/core
Submodule core updated 217 files
Original file line number Diff line number Diff line change
Expand Up @@ -283,18 +283,6 @@ public interface Configuration {
this.writeDispatcher = dispatcher
} as S

/**
* Sets the schema version of the Realm. This must be equal to or higher than the schema
* version of the existing Realm file, if any. If the schema version is higher than the
* already existing Realm, a migration is needed.
*/
public fun schemaVersion(schemaVersion: Long): S {
if (schemaVersion < 0) {
throw IllegalArgumentException("Realm schema version numbers must be 0 (zero) or higher. Yours was: $schemaVersion")
}
return apply { this.schemaVersion = schemaVersion } as S
}

/**
* Sets the 64 byte key used to encrypt and decrypt the Realm file. If no key is provided
* the Realm file will be unencrypted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ public interface RealmConfiguration : Configuration {
this.automaticEmbeddedObjectConstraintsResolution = resolveEmbeddedObjectConstraints
}

/**
* Sets the schema version of the Realm. This must be equal to or higher than the schema
* version of the existing Realm file, if any. If the schema version is higher than the
* already existing Realm, a migration is needed.
*/
public fun schemaVersion(schemaVersion: Long): Builder {
if (schemaVersion < 0) {
throw IllegalArgumentException("Realm schema version numbers must be 0 (zero) or higher. Yours was: $schemaVersion")
}
return apply { this.schemaVersion = schemaVersion }
}

override fun name(name: String): Builder = apply {
checkName(name)
this.name = name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@ import io.realm.kotlin.internal.RealmImpl
import io.realm.kotlin.internal.TypedFrozenRealmImpl
import io.realm.kotlin.internal.interop.AsyncOpenCallback
import io.realm.kotlin.internal.interop.FrozenRealmPointer
import io.realm.kotlin.internal.interop.InvalidSchemaException
import io.realm.kotlin.internal.interop.LiveRealmPointer
import io.realm.kotlin.internal.interop.LiveRealmT
import io.realm.kotlin.internal.interop.NativePointer
import io.realm.kotlin.internal.interop.RealmAppPointer
import io.realm.kotlin.internal.interop.RealmAsyncOpenTaskPointer
import io.realm.kotlin.internal.interop.RealmConfigurationPointer
import io.realm.kotlin.internal.interop.RealmInterop
import io.realm.kotlin.internal.interop.RealmSyncConfigurationPointer
import io.realm.kotlin.internal.interop.RealmSyncSessionPointer
import io.realm.kotlin.internal.interop.SchemaMode
import io.realm.kotlin.internal.interop.SyncAfterClientResetHandler
import io.realm.kotlin.internal.interop.SyncBeforeClientResetHandler
import io.realm.kotlin.internal.interop.SyncErrorCallback
Expand Down Expand Up @@ -84,22 +88,22 @@ internal class SyncConfigurationImpl(
// unnecessary pressure on the server.
val fileExists: Boolean = fileExists(configuration.path)
val asyncOpenCreatedRealmFile: AtomicBoolean = atomic(false)
if (initialRemoteData != null && !fileExists) {

if ((!fileExists && initialRemoteData != null) || (fileExists && isSyncMigrationPending())) {
// Channel to work around not being able to use `suspendCoroutine` to wrap the callback, as
// that results in the `Continuation` being frozen, which breaks it.
val channel = Channel<Any>(1)
val taskPointer: AtomicRef<RealmAsyncOpenTaskPointer?> = atomic(null)
try {
val result: Any = withTimeout(initialRemoteData.timeout.inWholeMilliseconds) {
withContext(realm.notificationScheduler.dispatcher) {
val result: Any = withTimeout(initialRemoteData!!.timeout.inWholeMilliseconds) {
withContext(realm.writeScheduler.dispatcher) {
val callback = AsyncOpenCallback { error: Throwable? ->
if (error != null) {
channel.trySend(error)
} else {
channel.trySend(true)
}
}

val configPtr = createNativeConfiguration()
taskPointer.value = RealmInterop.realm_open_synchronized(configPtr)
RealmInterop.realm_async_open_task_start(taskPointer.value!!, callback)
Expand Down Expand Up @@ -138,6 +142,37 @@ internal class SyncConfigurationImpl(
return Pair(result.first, result.second || asyncOpenCreatedRealmFile.value)
}

/**
* Checks whether a sync Realm requires a migration, this happens when the schema version provided in
* the config differs from the Realm one.
*
* Opening a Realm with a config set to SchemaMode::Immutable would validate that the schema versions
* match throwing an error if they differ.
*
* Immutable schema mode is only compatible with local Realms.
*/
private fun isSyncMigrationPending(): Boolean =
try {
// We need to open synced Realm as local to be able to use `RLM_SCHEMA_MODE_IMMUTABLE`
// RLM_SCHEMA_MODE_IMMUTABLE would throw if the persisted realm and configured schema versions
// differ.
val config = configuration.createNativeConfiguration()
RealmInterop.realm_config_set_schema_mode(
config = config,
mode = SchemaMode.RLM_SCHEMA_MODE_IMMUTABLE
)

val realmPtr: NativePointer<LiveRealmT> = RealmInterop.realm_open(config)
RealmInterop.realm_close(realmPtr)
logger.debug("Sync migration not required")
false
} catch (e: InvalidSchemaException) {
logger.debug("Sync migration required: ${e.message}")
true
} catch (e: Exception) {
throw e
}

override suspend fun initializeRealmData(realm: RealmImpl, realmFileCreated: Boolean) {
// Create or update subscriptions for Flexible Sync realms as needed.
initialSubscriptions?.let { initialSubscriptionsConfig ->
Expand Down Expand Up @@ -173,7 +208,7 @@ internal class SyncConfigurationImpl(
return syncInitializer(ptr)
}

private val syncInitializer: (RealmConfigurationPointer) -> RealmConfigurationPointer
private var syncInitializer: (RealmConfigurationPointer) -> RealmConfigurationPointer

init {
// We need to freeze `errorHandler` reference on initial thread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.realm.kotlin.mongodb.sync
import io.realm.kotlin.Configuration
import io.realm.kotlin.MutableRealm
import io.realm.kotlin.Realm
import io.realm.kotlin.RealmConfiguration
import io.realm.kotlin.TypedRealm
import io.realm.kotlin.internal.ConfigurationImpl
import io.realm.kotlin.internal.ContextLogger
Expand Down Expand Up @@ -453,6 +454,21 @@ public interface SyncConfiguration : Configuration {
)
}

/**
* Sets the schema version of the Realm. This must be equal to or higher than the schema
* version of the existing Realm file, if any. If the schema version is higher than the
* already existing Realm, a migration is needed.
*/
public fun schemaVersion(schemaVersion: Long, timeout: Duration = Duration.INFINITE): Builder {
if (schemaVersion < 0) {
throw IllegalArgumentException("Realm schema version numbers must be 0 (zero) or higher. Yours was: $schemaVersion")
}
return apply {
this.schemaVersion = schemaVersion
this.waitForServerChanges = InitialRemoteDataConfiguration(timeout)
}
}

@Suppress("LongMethod")
override fun build(): SyncConfiguration {
val realmLogger = ContextLogger()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class AppAdminImpl(

override suspend fun waitForSyncBootstrap() {
baasClient.run {
var limit = 300
val limit = 300
var i = 0
while (!app.initialSyncComplete() && i < limit) {
delay(1.seconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ import kotlinx.serialization.json.jsonPrimitive
import kotlinx.serialization.json.put
import kotlinx.serialization.serializer
import kotlin.reflect.KClass
import kotlinx.coroutines.withTimeout
import kotlin.time.Duration.Companion.minutes

private const val ADMIN_PATH = "/api/admin/v3.0"
private const val PRIVATE_PATH = "/api/private/v1.0"
Expand Down Expand Up @@ -403,7 +405,7 @@ class AppServicesClient(
suspend fun BaasApp.setSchema(
schema: Set<KClass<out BaseRealmObject>>,
extraProperties: Map<String, PrimitivePropertyType.Type> = emptyMap()
) {
): Map<String, String> {
val schemas = SchemaProcessor.process(
databaseName = clientAppId,
classes = schema,
Expand All @@ -423,6 +425,29 @@ class AppServicesClient(
schema = schema
)
}

return ids
}

suspend fun BaasApp.updateSchema(
ids: Map<String, String>,
schema: Set<KClass<out BaseRealmObject>>,
extraProperties: Map<String, PrimitivePropertyType.Type> = emptyMap()
) {
val schemas = SchemaProcessor.process(
databaseName = clientAppId,
classes = schema,
extraProperties = extraProperties
)

// then we update the schema to add the relationships
schemas.forEach { (name, schema) ->
updateSchema(
id = ids[name]!!,
schema = schema,
bypassServiceChange = true
)
}
}

suspend fun BaasApp.addFunction(function: Function): Function =
Expand All @@ -436,13 +461,44 @@ class AppServicesClient(
}
}

suspend fun BaasApp.waitForSchemaVersion(expectedVersion: Int) {
return withTimeout(1.minutes) {
withContext(dispatcher) {
while (true) {
val response = httpClient.typedRequest<JsonObject>(
Get,
"$url/sync/schemas/versions"
)

response["versions"]!!.jsonArray.forEach { version ->
if (version.jsonObject["version_major"]!!.jsonPrimitive.int >= expectedVersion) {
return@withContext
}
}
}
}
}
}

suspend fun BaasApp.deleteSchema(
id: String,
): HttpResponse =
withContext(dispatcher) {
httpClient.request(
"$url/schemas/$id"//?bypass_service_change=SyncSchemaVersionIncrease}"
) {
this.method = HttpMethod.Delete
}
}

suspend fun BaasApp.updateSchema(
id: String,
schema: Schema,
bypassServiceChange: Boolean = false,
): HttpResponse =
withContext(dispatcher) {
httpClient.request(
"$url/schemas/$id"
"$url/schemas/$id${if (bypassServiceChange) "?bypass_service_change=SyncSchemaVersionIncrease" else ""}"
) {
this.method = HttpMethod.Put
setBody(json.encodeToJsonElement(schema))
Expand Down
Loading

0 comments on commit fa96c03

Please sign in to comment.