Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(mls): fetch other user supported protocols when resolving 1:1 conversation [WPB-5048] #2405

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ internal class UserDataSource internal constructor(

override suspend fun fetchAllOtherUsers(): Either<CoreFailure, Unit> {
val ids = userDAO.allOtherUsersId().map(UserIDEntity::toModel).toSet()

return fetchUsersByIds(ids)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,8 @@ import com.wire.kalium.logic.feature.user.SyncSelfUserUseCase
import com.wire.kalium.logic.feature.user.SyncSelfUserUseCaseImpl
import com.wire.kalium.logic.feature.user.UpdateSupportedProtocolsAndResolveOneOnOnesUseCase
import com.wire.kalium.logic.feature.user.UpdateSupportedProtocolsAndResolveOneOnOnesUseCaseImpl
import com.wire.kalium.logic.feature.user.UpdateSupportedProtocolsUseCase
import com.wire.kalium.logic.feature.user.UpdateSupportedProtocolsUseCaseImpl
import com.wire.kalium.logic.feature.user.UpdateSelfUserSupportedProtocolsUseCase
import com.wire.kalium.logic.feature.user.UpdateSelfUserSupportedProtocolsUseCaseImpl
import com.wire.kalium.logic.feature.user.UserScope
import com.wire.kalium.logic.feature.user.e2ei.MarkNotifyForRevokedCertificateAsNotifiedUseCase
import com.wire.kalium.logic.feature.user.e2ei.MarkNotifyForRevokedCertificateAsNotifiedUseCaseImpl
Expand Down Expand Up @@ -967,8 +967,8 @@ class UserSessionScope internal constructor(
incrementalSyncRepository
)

private val updateSupportedProtocols: UpdateSupportedProtocolsUseCase
get() = UpdateSupportedProtocolsUseCaseImpl(
private val updateSupportedProtocols: UpdateSelfUserSupportedProtocolsUseCase
get() = UpdateSelfUserSupportedProtocolsUseCaseImpl(
clientRepository,
userRepository,
userConfigRepository,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ internal class AcceptConnectionRequestUseCaseImpl(
)
}.flatMap {
oneOnOneResolver.resolveOneOnOneConversationWithUserId(
connection.qualifiedToId
userId = connection.qualifiedToId,
invalidateCurrentKnownProtocols = true
).map { }
}.flatMap {
newGroupConversationSystemMessagesCreator.conversationStartedUnverifiedWarning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ internal class GetOrCreateOneToOneConversationUseCaseImpl(
private suspend fun resolveOneOnOneConversationWithUser(otherUserId: UserId): Either<CoreFailure, Conversation> =
(userRepository.getKnownUser(otherUserId).first()?.let { otherUser ->
// TODO support lazily establishing mls group for team 1-1
oneOnOneResolver.resolveOneOnOneConversationWithUser(otherUser).flatMap {
oneOnOneResolver.resolveOneOnOneConversationWithUser(
user = otherUser,
invalidateCurrentKnownProtocols = true
).flatMap {
conversationRepository.getConversationById(it)?.let { Either.Right(it) } ?: Either.Left(StorageFailure.DataNotFound)
}
} ?: Either.Left(StorageFailure.DataNotFound))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ internal class NotifyConversationIsOpenUseCaseImpl(
kaliumLogger.v(
"Reevaluating protocol for 1:1 conversation with ID: ${conversationId.toLogString()}"
)
oneOnOneResolver.resolveOneOnOneConversationWithUser(conversation.otherUser)
oneOnOneResolver.resolveOneOnOneConversationWithUser(
user = conversation.otherUser,
invalidateCurrentKnownProtocols = true
)
}

// Delete Ephemeral Messages that has passed the end date
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,32 @@ import kotlin.time.Duration
interface OneOnOneResolver {
suspend fun resolveAllOneOnOneConversations(synchronizeUsers: Boolean = false): Either<CoreFailure, Unit>
suspend fun scheduleResolveOneOnOneConversationWithUserId(userId: UserId, delay: Duration = Duration.ZERO): Job
suspend fun resolveOneOnOneConversationWithUserId(userId: UserId): Either<CoreFailure, ConversationId>
suspend fun resolveOneOnOneConversationWithUser(user: OtherUser): Either<CoreFailure, ConversationId>

/**
* Resolves a one-on-one conversation with a user based on their userId.
*
* @param userId The userId of the other user in the conversation.
* @param invalidateCurrentKnownProtocols Flag indicating whether to whether it should attempt refreshing the other user's list of
* supported protocols by fetching from remote. In case of failure, the local result will be used as a fallback.
* @return Either a [CoreFailure] if there is an error or a [ConversationId] if the resolution is successful.
*/
suspend fun resolveOneOnOneConversationWithUserId(
userId: UserId,
invalidateCurrentKnownProtocols: Boolean,
): Either<CoreFailure, ConversationId>

/**
* Resolves a one-on-one conversation with a user.
*
* @param user The other user in the conversation.
* @param invalidateCurrentKnownProtocols Flag indicating whether to whether it should attempt refreshing the other user's list of
* supported protocols by fetching from remote. In case of failure, the local result will be used as a fallback.
* @return Either a [CoreFailure] if there is an error or a [ConversationId] if the resolution is successful.
*/
suspend fun resolveOneOnOneConversationWithUser(
user: OtherUser,
invalidateCurrentKnownProtocols: Boolean,
): Either<CoreFailure, ConversationId>
}

internal class OneOnOneResolverImpl(
Expand All @@ -62,52 +86,74 @@ internal class OneOnOneResolverImpl(

@OptIn(ExperimentalCoroutinesApi::class)
private val dispatcher = kaliumDispatcher.default.limitedParallelism(1)

// TODO: inherit the scope of UserSessionScope so it's cancelled if user logs out, etc.
private val resolveActiveOneOnOneScope = CoroutineScope(dispatcher)

override suspend fun resolveAllOneOnOneConversations(synchronizeUsers: Boolean): Either<CoreFailure, Unit> =
if (synchronizeUsers) {
userRepository.fetchAllOtherUsers()
} else {
Either.Right(Unit)
}.flatMap {
fetchAllOtherUsersIfNeeded(synchronizeUsers).flatMap {
val usersWithOneOnOne = userRepository.getUsersWithOneOnOneConversation()
kaliumLogger.i("Resolving one-on-one protocol for ${usersWithOneOnOne.size} user(s)")
usersWithOneOnOne.foldToEitherWhileRight(Unit) { item, _ ->
resolveOneOnOneConversationWithUser(item).flatMapLeft {
when (it) {
is CoreFailure.NoKeyPackagesAvailable,
is NetworkFailure.ServerMiscommunication,
is NetworkFailure.FederatedBackendFailure,
is CoreFailure.NoCommonProtocolFound
-> {
kaliumLogger.e("Resolving one-on-one failed $it, skipping")
Either.Right(Unit)
}

else -> {
kaliumLogger.e("Resolving one-on-one failed $it, retrying")
Either.Left(it)
}
}
resolveOneOnOneConversationWithUser(
user = item,
// Either it fetched all users on the previous step, or it's not needed
invalidateCurrentKnownProtocols = false
).flatMapLeft {
handleBatchEntryFailure(it)
}.map { }
}
}

private fun handleBatchEntryFailure(it: CoreFailure) = when (it) {
is CoreFailure.NoKeyPackagesAvailable,
is NetworkFailure.ServerMiscommunication,
is NetworkFailure.FederatedBackendFailure,
is CoreFailure.NoCommonProtocolFound
-> {
kaliumLogger.e("Resolving one-on-one failed $it, skipping")
Either.Right(Unit)
}

else -> {
kaliumLogger.e("Resolving one-on-one failed $it, retrying")
Either.Left(it)
}
}

private suspend fun fetchAllOtherUsersIfNeeded(synchronizeUsers: Boolean) = if (synchronizeUsers) {
userRepository.fetchAllOtherUsers()
} else {
Either.Right(Unit)
}

override suspend fun scheduleResolveOneOnOneConversationWithUserId(userId: UserId, delay: Duration) =
resolveActiveOneOnOneScope.launch {
kaliumLogger.d("Schedule resolving active one-on-one")
incrementalSyncRepository.incrementalSyncState.first { it is IncrementalSyncStatus.Live }
delay(delay)
resolveOneOnOneConversationWithUserId(userId)
resolveOneOnOneConversationWithUserId(
userId = userId,
invalidateCurrentKnownProtocols = true
)
}

override suspend fun resolveOneOnOneConversationWithUserId(userId: UserId): Either<CoreFailure, ConversationId> =
override suspend fun resolveOneOnOneConversationWithUserId(
userId: UserId,
invalidateCurrentKnownProtocols: Boolean
): Either<CoreFailure, ConversationId> =
userRepository.getKnownUser(userId).firstOrNull()?.let {
resolveOneOnOneConversationWithUser(it)
resolveOneOnOneConversationWithUser(it, invalidateCurrentKnownProtocols)
} ?: Either.Left(StorageFailure.DataNotFound)

override suspend fun resolveOneOnOneConversationWithUser(user: OtherUser): Either<CoreFailure, ConversationId> {
override suspend fun resolveOneOnOneConversationWithUser(
user: OtherUser,
invalidateCurrentKnownProtocols: Boolean,
): Either<CoreFailure, ConversationId> {
kaliumLogger.i("Resolving one-on-one protocol for ${user.id.toLogString()}")
if (invalidateCurrentKnownProtocols) {
userRepository.fetchUsersByIds(setOf(user.id))
}
return oneOnOneProtocolSelector.getProtocolForUser(user.id).flatMap { supportedProtocol ->
when (supportedProtocol) {
SupportedProtocol.PROTEUS -> oneOnOneMigrator.migrateToProteus(user)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ import kotlinx.datetime.Instant
/**
* Updates the supported protocols of the current user.
*/
interface UpdateSupportedProtocolsUseCase {
interface UpdateSelfUserSupportedProtocolsUseCase {
suspend operator fun invoke(): Either<CoreFailure, Boolean>
}

internal class UpdateSupportedProtocolsUseCaseImpl(
internal class UpdateSelfUserSupportedProtocolsUseCaseImpl(
private val clientsRepository: ClientRepository,
private val userRepository: UserRepository,
private val userConfigRepository: UserConfigRepository,
private val featureSupport: FeatureSupport
) : UpdateSupportedProtocolsUseCase {
) : UpdateSelfUserSupportedProtocolsUseCase {

override suspend operator fun invoke(): Either<CoreFailure, Boolean> {
return if (!featureSupport.isMLSSupported) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ interface UpdateSupportedProtocolsAndResolveOneOnOnesUseCase {
}

internal class UpdateSupportedProtocolsAndResolveOneOnOnesUseCaseImpl(
private val updateSupportedProtocols: UpdateSupportedProtocolsUseCase,
private val updateSupportedProtocols: UpdateSelfUserSupportedProtocolsUseCase,
private val oneOnOneResolver: OneOnOneResolver
) : UpdateSupportedProtocolsAndResolveOneOnOnesUseCase {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class UserScope internal constructor(
private val e2EIRepository: E2EIRepository,
private val mlsConversationRepository: MLSConversationRepository,
private val isSelfATeamMember: IsSelfATeamMemberUseCase,
private val updateSupportedProtocolsUseCase: UpdateSupportedProtocolsUseCase,
private val updateSelfUserSupportedProtocolsUseCase: UpdateSelfUserSupportedProtocolsUseCase,
) {
private val validateUserHandleUseCase: ValidateUserHandleUseCase get() = ValidateUserHandleUseCaseImpl()
val getSelfUser: GetSelfUserUseCase get() = GetSelfUserUseCaseImpl(userRepository)
Expand Down Expand Up @@ -179,7 +179,7 @@ class UserScope internal constructor(

val deleteAccount: DeleteAccountUseCase get() = DeleteAccountUseCase(accountRepository)

val updateSupportedProtocols: UpdateSupportedProtocolsUseCase get() = updateSupportedProtocolsUseCase
val updateSupportedProtocols: UpdateSelfUserSupportedProtocolsUseCase get() = updateSelfUserSupportedProtocolsUseCase

val observeCertificateRevocationForSelfClient: ObserveCertificateRevocationForSelfClientUseCase
get() = ObserveCertificateRevocationForSelfClientUseCaseImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ internal class MLSWelcomeEventHandlerImpl(
.first()
.flatMap {
if (it is ConversationDetails.OneOne) {
oneOnOneResolver.resolveOneOnOneConversationWithUser(it.otherUser).map { Unit }
oneOnOneResolver.resolveOneOnOneConversationWithUser(
user = it.otherUser,
invalidateCurrentKnownProtocols = true
).map { Unit }
} else {
Either.Right(Unit)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ internal class NewConversationEventHandlerImpl(
private suspend fun resolveConversationIfOneOnOne(selfUserTeamId: TeamId?, event: Event.Conversation.NewConversation) =
if (event.conversation.toConversationType(selfUserTeamId) == ConversationEntity.Type.ONE_ON_ONE) {
val otherUserId = event.conversation.members.otherMembers.first().id.toModel()
oneOnOneResolver.resolveOneOnOneConversationWithUserId(otherUserId).map { Unit }
oneOnOneResolver.resolveOneOnOneConversationWithUserId(
userId = otherUserId,
invalidateCurrentKnownProtocols = true
).map { Unit }
} else Either.Right(Unit)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import com.wire.kalium.logic.feature.legalhold.FetchLegalHoldForSelfUserFromRemo
import com.wire.kalium.logic.feature.team.SyncSelfTeamUseCase
import com.wire.kalium.logic.feature.user.SyncContactsUseCase
import com.wire.kalium.logic.feature.user.SyncSelfUserUseCase
import com.wire.kalium.logic.feature.user.UpdateSupportedProtocolsUseCase
import com.wire.kalium.logic.feature.user.UpdateSelfUserSupportedProtocolsUseCase
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.foldToEitherWhileRight
Expand Down Expand Up @@ -62,7 +62,7 @@ internal class SlowSyncWorkerImpl(
private val eventRepository: EventRepository,
private val syncSelfUser: SyncSelfUserUseCase,
private val syncFeatureConfigs: SyncFeatureConfigsUseCase,
private val updateSupportedProtocols: UpdateSupportedProtocolsUseCase,
private val updateSupportedProtocols: UpdateSelfUserSupportedProtocolsUseCase,
private val syncConversations: SyncConversationsUseCase,
private val syncConnections: SyncConnectionsUseCase,
private val syncSelfTeam: SyncSelfTeamUseCase,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class AcceptConnectionRequestUseCaseTest {
assertEquals(AcceptConnectionRequestUseCaseResult.Success, result)
verify(arrangement.oneOnOneResolver)
.suspendFunction(arrangement.oneOnOneResolver::resolveOneOnOneConversationWithUserId)
.with(eq(CONNECTION.qualifiedToId))
.with(eq(CONNECTION.qualifiedToId), eq(true))
.wasInvoked(once)
}

Expand Down
Loading
Loading