Skip to content

Commit

Permalink
Merge branch 'feat/e2ei/respect-e2ei-creating-mls-client' of https://…
Browse files Browse the repository at this point in the history
…github.com/wireapp/kalium into feat/e2ei/uprade-cc-rc33

# Conflicts:
#	logic/src/commonMain/kotlin/com/wire/kalium/logic/data/e2ei/AcmeAuthorization.kt
#	logic/src/commonTest/kotlin/com/wire/kalium/logic/data/conversation/MLSConversationRepositoryTest.kt
#	logic/src/commonTest/kotlin/com/wire/kalium/logic/data/e2ei/E2EIRepositoryTest.kt
#	logic/src/commonTest/kotlin/com/wire/kalium/logic/feature/e2ei/EnrollE2EICertificateUseCaseTest.kt
  • Loading branch information
mchenani committed Jan 26, 2024
2 parents fabb198 + 9027919 commit 2283311
Show file tree
Hide file tree
Showing 38 changed files with 375 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import com.wire.kalium.logic.StorageFailure
import com.wire.kalium.logic.data.conversation.ConversationDetails
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.event.Event
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.id.toApi
import com.wire.kalium.logic.data.id.toDao
import com.wire.kalium.logic.data.user.Connection
Expand Down Expand Up @@ -54,6 +53,7 @@ import com.wire.kalium.network.api.base.authenticated.connection.ConnectionApi
import com.wire.kalium.network.api.base.authenticated.connection.ConnectionDTO
import com.wire.kalium.network.api.base.authenticated.connection.ConnectionStateDTO
import com.wire.kalium.persistence.dao.ConnectionDAO
import com.wire.kalium.persistence.dao.ConnectionEntity
import com.wire.kalium.persistence.dao.UserDAO
import com.wire.kalium.persistence.dao.conversation.ConversationDAO
import com.wire.kalium.persistence.dao.conversation.ConversationEntity
Expand All @@ -74,7 +74,7 @@ interface ConnectionRepository {
suspend fun observeConnectionRequestsForNotification(): Flow<List<ConversationDetails>>
suspend fun setConnectionAsNotified(userId: UserId)
suspend fun setAllConnectionsAsNotified()
suspend fun deleteConnection(conversationId: ConversationId): Either<StorageFailure, Unit>
suspend fun deleteConnection(connection: Connection): Either<StorageFailure, Unit>
}

@Suppress("LongParameterList", "TooManyFunctions")
Expand Down Expand Up @@ -249,8 +249,9 @@ internal class ConnectionDataSource(
}
}

override suspend fun deleteConnection(conversationId: ConversationId) = wrapStorageRequest {
connectionDAO.deleteConnectionDataAndConversation(conversationId.toDao())
override suspend fun deleteConnection(connection: Connection) = wrapStorageRequest {
connectionDAO.deleteConnectionDataAndConversation(connection.qualifiedConversationId.toDao())
userDAO.upsertConnectionStatuses(mapOf(connection.qualifiedToId.toDao() to ConnectionEntity.State.CANCELLED))
}

/**
Expand All @@ -260,6 +261,6 @@ internal class ConnectionDataSource(
private suspend fun handleUserConnectionStatusPersistence(connection: Connection): Either<CoreFailure, Unit> =
when (connection.status) {
ACCEPTED, MISSING_LEGALHOLD_CONSENT, NOT_CONNECTED, PENDING, SENT, BLOCKED, IGNORED -> persistConnection(connection)
CANCELLED -> deleteConnection(connection.qualifiedConversationId)
CANCELLED -> deleteConnection(connection)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,12 @@ internal class CertificateRevocationListRepositoryDataSource(
metadataDAO.getSerializable(CRL_LIST_KEY, CRLUrlExpirationList.serializer())

override suspend fun addOrUpdateCRL(url: String, timestamp: ULong) {

metadataDAO.getSerializable(CRL_LIST_KEY, CRLUrlExpirationList.serializer())
val newCRLUrls = metadataDAO.getSerializable(CRL_LIST_KEY, CRLUrlExpirationList.serializer())
?.let { crlExpirationList ->
val crlWithExpiration = crlExpirationList.cRLWithExpirationList.find {
it.url == url
}
val newCRLs = crlWithExpiration?.let { item ->
crlWithExpiration?.let { item ->
crlExpirationList.cRLWithExpirationList.map { current ->
if (current.url == url) {
return@map item.copy(expiration = timestamp)
Expand All @@ -72,12 +71,15 @@ internal class CertificateRevocationListRepositoryDataSource(
)
}

metadataDAO.putSerializable(
CRL_LIST_KEY,
CRLUrlExpirationList(newCRLs),
CRLUrlExpirationList.serializer()
)
}
} ?: run {
// add new CRL
listOf(CRLWithExpiration(url, timestamp))
}
metadataDAO.putSerializable(
CRL_LIST_KEY,
CRLUrlExpirationList(newCRLUrls),
CRLUrlExpirationList.serializer()
)
}

override suspend fun getCurrentClientCrlUrl(): Either<CoreFailure, String> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ internal class UserDataSource internal constructor(

override suspend fun fetchAllOtherUsers(): Either<CoreFailure, Unit> {
val ids = userDAO.allOtherUsersId().map(UserIDEntity::toModel).toSet()

return fetchUsersByIds(ids)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,8 @@ import com.wire.kalium.logic.feature.user.SyncSelfUserUseCase
import com.wire.kalium.logic.feature.user.SyncSelfUserUseCaseImpl
import com.wire.kalium.logic.feature.user.UpdateSupportedProtocolsAndResolveOneOnOnesUseCase
import com.wire.kalium.logic.feature.user.UpdateSupportedProtocolsAndResolveOneOnOnesUseCaseImpl
import com.wire.kalium.logic.feature.user.UpdateSupportedProtocolsUseCase
import com.wire.kalium.logic.feature.user.UpdateSupportedProtocolsUseCaseImpl
import com.wire.kalium.logic.feature.user.UpdateSelfUserSupportedProtocolsUseCase
import com.wire.kalium.logic.feature.user.UpdateSelfUserSupportedProtocolsUseCaseImpl
import com.wire.kalium.logic.feature.user.UserScope
import com.wire.kalium.logic.feature.user.e2ei.MarkNotifyForRevokedCertificateAsNotifiedUseCase
import com.wire.kalium.logic.feature.user.e2ei.MarkNotifyForRevokedCertificateAsNotifiedUseCaseImpl
Expand Down Expand Up @@ -979,8 +979,8 @@ class UserSessionScope internal constructor(
incrementalSyncRepository
)

private val updateSupportedProtocols: UpdateSupportedProtocolsUseCase
get() = UpdateSupportedProtocolsUseCaseImpl(
private val updateSupportedProtocols: UpdateSelfUserSupportedProtocolsUseCase
get() = UpdateSelfUserSupportedProtocolsUseCaseImpl(
clientRepository,
userRepository,
userConfigRepository,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import com.wire.kalium.util.DelicateKaliumApi

@Suppress("LongParameterList")
class ClientScope @OptIn(DelicateKaliumApi::class) internal constructor(
val clientRepository: ClientRepository,
val clientRepository: ClientRepository,
private val pushTokenRepository: PushTokenRepository,
private val logoutRepository: LogoutRepository,
private val preKeyRepository: PreKeyRepository,
Expand All @@ -61,7 +61,7 @@ class ClientScope @OptIn(DelicateKaliumApi::class) internal constructor(
private val clientRemoteRepository: ClientRemoteRepository,
private val proteusClientProvider: ProteusClientProvider,
private val sessionRepository: SessionRepository,
val upgradeCurrentSessionUseCase: UpgradeCurrentSessionUseCase,
private val upgradeCurrentSessionUseCase: UpgradeCurrentSessionUseCase,
private val selfUserId: UserId,
private val isAllowedToRegisterMLSClient: IsAllowedToRegisterMLSClientUseCase,
private val clientIdProvider: CurrentClientIdProvider,
Expand All @@ -70,7 +70,7 @@ class ClientScope @OptIn(DelicateKaliumApi::class) internal constructor(
private val slowSyncRepository: SlowSyncRepository,
private val cachedClientIdClearer: CachedClientIdClearer,
private val updateSupportedProtocolsAndResolveOneOnOnes: UpdateSupportedProtocolsAndResolveOneOnOnesUseCase,
val registerMLSClientUseCase: RegisterMLSClientUseCase,
private val registerMLSClientUseCase: RegisterMLSClientUseCase,
private val syncFeatureConfigsUseCase: SyncFeatureConfigsUseCase
) {
@OptIn(DelicateKaliumApi::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@
package com.wire.kalium.logic.feature.client

import com.wire.kalium.logic.data.client.ClientRepository
import com.wire.kalium.logic.feature.user.screenshotCensoring.ObserveScreenshotCensoringConfigResult
import kotlinx.coroutines.flow.Flow

interface ObserveIsE2EIRequiredState {
suspend operator fun invoke(): Flow<Boolean?>
}
internal class ObserveIsE2EIRequiredStateImpl(
val clientRepository: ClientRepository
): ObserveIsE2EIRequiredState{
internal class ObserveIsE2EIRequiredStateImpl(val clientRepository: ClientRepository) : ObserveIsE2EIRequiredState {
override suspend fun invoke() = clientRepository.observeIsClientRegistrationBlockedByE2EI()
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class RegisterClientUseCaseImpl @OptIn(DelicateKaliumApi::class) internal constr
RegisterClientResult.Failure.Generic(it)
}, { registerClientParam ->
clientRepository.registerClient(registerClientParam)
//todo? separate this in mls client usesCase register! separate everything
// todo? separate this in mls client usesCase register! separate everything
.flatMap { registeredClient ->
if (isAllowedToRegisterMLSClient()) {
registerMLSClientUseCase.invoke(clientId = registeredClient.id).flatMap {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package com.wire.kalium.logic.feature.client
import com.wire.kalium.cryptography.MLSClient
import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.configuration.UserConfigRepository
import com.wire.kalium.logic.data.client.Client
import com.wire.kalium.logic.data.client.ClientRepository
import com.wire.kalium.logic.data.client.MLSClientProvider
import com.wire.kalium.logic.data.conversation.ClientId
Expand All @@ -30,9 +29,6 @@ import com.wire.kalium.logic.data.keypackage.KeyPackageRepository
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.functional.onFailure
import com.wire.kalium.logic.functional.onSuccess
import com.wire.kalium.logic.kaliumLogger

sealed class RegisterMLSClientResult {
Expand Down Expand Up @@ -62,9 +58,7 @@ internal class RegisterMLSClientUseCaseImpl(
userConfigRepository.getE2EISettings().fold({
Either.Right(mlsClient)
}, { e2eiSettings ->
kaliumLogger.e("### e2ei config: ${e2eiSettings}")
if (e2eiSettings.isRequired && !mlsClient.isE2EIEnabled()) {
kaliumLogger.i("##### ${clientId.value}")
kaliumLogger.i("MLS Client registration stopped: e2ei is required and is not enrolled!")
return Either.Right(RegisterMLSClientResult.E2EICertificateRequired(mlsClient))
} else Either.Right(mlsClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ internal class AcceptConnectionRequestUseCaseImpl(
)
}.flatMap {
oneOnOneResolver.resolveOneOnOneConversationWithUserId(
connection.qualifiedToId
userId = connection.qualifiedToId,
invalidateCurrentKnownProtocols = true
).map { }
}.flatMap {
newGroupConversationSystemMessagesCreator.conversationStartedUnverifiedWarning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ internal class GetOrCreateOneToOneConversationUseCaseImpl(
private suspend fun resolveOneOnOneConversationWithUser(otherUserId: UserId): Either<CoreFailure, Conversation> =
(userRepository.getKnownUser(otherUserId).first()?.let { otherUser ->
// TODO support lazily establishing mls group for team 1-1
oneOnOneResolver.resolveOneOnOneConversationWithUser(otherUser).flatMap {
oneOnOneResolver.resolveOneOnOneConversationWithUser(
user = otherUser,
invalidateCurrentKnownProtocols = true
).flatMap {
conversationRepository.getConversationById(it)?.let { Either.Right(it) } ?: Either.Left(StorageFailure.DataNotFound)
}
} ?: Either.Left(StorageFailure.DataNotFound))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ internal class NotifyConversationIsOpenUseCaseImpl(
kaliumLogger.v(
"Reevaluating protocol for 1:1 conversation with ID: ${conversationId.toLogString()}"
)
oneOnOneResolver.resolveOneOnOneConversationWithUser(conversation.otherUser)
oneOnOneResolver.resolveOneOnOneConversationWithUser(
user = conversation.otherUser,
invalidateCurrentKnownProtocols = true
)
}

// Delete Ephemeral Messages that has passed the end date
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,32 @@ import kotlin.time.Duration
interface OneOnOneResolver {
suspend fun resolveAllOneOnOneConversations(synchronizeUsers: Boolean = false): Either<CoreFailure, Unit>
suspend fun scheduleResolveOneOnOneConversationWithUserId(userId: UserId, delay: Duration = Duration.ZERO): Job
suspend fun resolveOneOnOneConversationWithUserId(userId: UserId): Either<CoreFailure, ConversationId>
suspend fun resolveOneOnOneConversationWithUser(user: OtherUser): Either<CoreFailure, ConversationId>

/**
* Resolves a one-on-one conversation with a user based on their userId.
*
* @param userId The userId of the other user in the conversation.
* @param invalidateCurrentKnownProtocols Flag indicating whether to whether it should attempt refreshing the other user's list of
* supported protocols by fetching from remote. In case of failure, the local result will be used as a fallback.
* @return Either a [CoreFailure] if there is an error or a [ConversationId] if the resolution is successful.
*/
suspend fun resolveOneOnOneConversationWithUserId(
userId: UserId,
invalidateCurrentKnownProtocols: Boolean,
): Either<CoreFailure, ConversationId>

/**
* Resolves a one-on-one conversation with a user.
*
* @param user The other user in the conversation.
* @param invalidateCurrentKnownProtocols Flag indicating whether to whether it should attempt refreshing the other user's list of
* supported protocols by fetching from remote. In case of failure, the local result will be used as a fallback.
* @return Either a [CoreFailure] if there is an error or a [ConversationId] if the resolution is successful.
*/
suspend fun resolveOneOnOneConversationWithUser(
user: OtherUser,
invalidateCurrentKnownProtocols: Boolean,
): Either<CoreFailure, ConversationId>
}

internal class OneOnOneResolverImpl(
Expand All @@ -62,52 +86,74 @@ internal class OneOnOneResolverImpl(

@OptIn(ExperimentalCoroutinesApi::class)
private val dispatcher = kaliumDispatcher.default.limitedParallelism(1)

// TODO: inherit the scope of UserSessionScope so it's cancelled if user logs out, etc.
private val resolveActiveOneOnOneScope = CoroutineScope(dispatcher)

override suspend fun resolveAllOneOnOneConversations(synchronizeUsers: Boolean): Either<CoreFailure, Unit> =
if (synchronizeUsers) {
userRepository.fetchAllOtherUsers()
} else {
Either.Right(Unit)
}.flatMap {
fetchAllOtherUsersIfNeeded(synchronizeUsers).flatMap {
val usersWithOneOnOne = userRepository.getUsersWithOneOnOneConversation()
kaliumLogger.i("Resolving one-on-one protocol for ${usersWithOneOnOne.size} user(s)")
usersWithOneOnOne.foldToEitherWhileRight(Unit) { item, _ ->
resolveOneOnOneConversationWithUser(item).flatMapLeft {
when (it) {
is CoreFailure.NoKeyPackagesAvailable,
is NetworkFailure.ServerMiscommunication,
is NetworkFailure.FederatedBackendFailure,
is CoreFailure.NoCommonProtocolFound
-> {
kaliumLogger.e("Resolving one-on-one failed $it, skipping")
Either.Right(Unit)
}

else -> {
kaliumLogger.e("Resolving one-on-one failed $it, retrying")
Either.Left(it)
}
}
resolveOneOnOneConversationWithUser(
user = item,
// Either it fetched all users on the previous step, or it's not needed
invalidateCurrentKnownProtocols = false
).flatMapLeft {
handleBatchEntryFailure(it)
}.map { }
}
}

private fun handleBatchEntryFailure(it: CoreFailure) = when (it) {
is CoreFailure.NoKeyPackagesAvailable,
is NetworkFailure.ServerMiscommunication,
is NetworkFailure.FederatedBackendFailure,
is CoreFailure.NoCommonProtocolFound
-> {
kaliumLogger.e("Resolving one-on-one failed $it, skipping")
Either.Right(Unit)
}

else -> {
kaliumLogger.e("Resolving one-on-one failed $it, retrying")
Either.Left(it)
}
}

private suspend fun fetchAllOtherUsersIfNeeded(synchronizeUsers: Boolean) = if (synchronizeUsers) {
userRepository.fetchAllOtherUsers()
} else {
Either.Right(Unit)
}

override suspend fun scheduleResolveOneOnOneConversationWithUserId(userId: UserId, delay: Duration) =
resolveActiveOneOnOneScope.launch {
kaliumLogger.d("Schedule resolving active one-on-one")
incrementalSyncRepository.incrementalSyncState.first { it is IncrementalSyncStatus.Live }
delay(delay)
resolveOneOnOneConversationWithUserId(userId)
resolveOneOnOneConversationWithUserId(
userId = userId,
invalidateCurrentKnownProtocols = true
)
}

override suspend fun resolveOneOnOneConversationWithUserId(userId: UserId): Either<CoreFailure, ConversationId> =
override suspend fun resolveOneOnOneConversationWithUserId(
userId: UserId,
invalidateCurrentKnownProtocols: Boolean
): Either<CoreFailure, ConversationId> =
userRepository.getKnownUser(userId).firstOrNull()?.let {
resolveOneOnOneConversationWithUser(it)
resolveOneOnOneConversationWithUser(it, invalidateCurrentKnownProtocols)
} ?: Either.Left(StorageFailure.DataNotFound)

override suspend fun resolveOneOnOneConversationWithUser(user: OtherUser): Either<CoreFailure, ConversationId> {
override suspend fun resolveOneOnOneConversationWithUser(
user: OtherUser,
invalidateCurrentKnownProtocols: Boolean,
): Either<CoreFailure, ConversationId> {
kaliumLogger.i("Resolving one-on-one protocol for ${user.id.toLogString()}")
if (invalidateCurrentKnownProtocols) {
userRepository.fetchUsersByIds(setOf(user.id))
}
return oneOnOneProtocolSelector.getProtocolForUser(user.id).flatMap { supportedProtocol ->
when (supportedProtocol) {
SupportedProtocol.PROTEUS -> oneOnOneMigrator.migrateToProteus(user)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ sealed interface E2EIEnrollmentResult {
ConversationMigration,
Certificate
}

@Suppress("LongParameterList")
class Initialized(
val idpTarget: String,
val oAuthState: String?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ import kotlinx.datetime.Instant
/**
* Updates the supported protocols of the current user.
*/
interface UpdateSupportedProtocolsUseCase {
interface UpdateSelfUserSupportedProtocolsUseCase {
suspend operator fun invoke(): Either<CoreFailure, Boolean>
}

internal class UpdateSupportedProtocolsUseCaseImpl(
internal class UpdateSelfUserSupportedProtocolsUseCaseImpl(
private val clientsRepository: ClientRepository,
private val userRepository: UserRepository,
private val userConfigRepository: UserConfigRepository,
private val featureSupport: FeatureSupport
) : UpdateSupportedProtocolsUseCase {
) : UpdateSelfUserSupportedProtocolsUseCase {

override suspend operator fun invoke(): Either<CoreFailure, Boolean> {
return if (!featureSupport.isMLSSupported) {
Expand Down
Loading

0 comments on commit 2283311

Please sign in to comment.