Skip to content

Commit

Permalink
feat: fetch other user's protocols from remote when resolving 1:1 con…
Browse files Browse the repository at this point in the history
…versations (#2405)
  • Loading branch information
vitorhugods authored Jan 26, 2024
1 parent 457475e commit ee75ec7
Show file tree
Hide file tree
Showing 18 changed files with 252 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ internal class UserDataSource internal constructor(

override suspend fun fetchAllOtherUsers(): Either<CoreFailure, Unit> {
val ids = userDAO.allOtherUsersId().map(UserIDEntity::toModel).toSet()

return fetchUsersByIds(ids)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,8 @@ import com.wire.kalium.logic.feature.user.SyncSelfUserUseCase
import com.wire.kalium.logic.feature.user.SyncSelfUserUseCaseImpl
import com.wire.kalium.logic.feature.user.UpdateSupportedProtocolsAndResolveOneOnOnesUseCase
import com.wire.kalium.logic.feature.user.UpdateSupportedProtocolsAndResolveOneOnOnesUseCaseImpl
import com.wire.kalium.logic.feature.user.UpdateSupportedProtocolsUseCase
import com.wire.kalium.logic.feature.user.UpdateSupportedProtocolsUseCaseImpl
import com.wire.kalium.logic.feature.user.UpdateSelfUserSupportedProtocolsUseCase
import com.wire.kalium.logic.feature.user.UpdateSelfUserSupportedProtocolsUseCaseImpl
import com.wire.kalium.logic.feature.user.UserScope
import com.wire.kalium.logic.feature.user.e2ei.MarkNotifyForRevokedCertificateAsNotifiedUseCase
import com.wire.kalium.logic.feature.user.e2ei.MarkNotifyForRevokedCertificateAsNotifiedUseCaseImpl
Expand Down Expand Up @@ -967,8 +967,8 @@ class UserSessionScope internal constructor(
incrementalSyncRepository
)

private val updateSupportedProtocols: UpdateSupportedProtocolsUseCase
get() = UpdateSupportedProtocolsUseCaseImpl(
private val updateSupportedProtocols: UpdateSelfUserSupportedProtocolsUseCase
get() = UpdateSelfUserSupportedProtocolsUseCaseImpl(
clientRepository,
userRepository,
userConfigRepository,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ internal class AcceptConnectionRequestUseCaseImpl(
)
}.flatMap {
oneOnOneResolver.resolveOneOnOneConversationWithUserId(
connection.qualifiedToId
userId = connection.qualifiedToId,
invalidateCurrentKnownProtocols = true
).map { }
}.flatMap {
newGroupConversationSystemMessagesCreator.conversationStartedUnverifiedWarning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ internal class GetOrCreateOneToOneConversationUseCaseImpl(
private suspend fun resolveOneOnOneConversationWithUser(otherUserId: UserId): Either<CoreFailure, Conversation> =
(userRepository.getKnownUser(otherUserId).first()?.let { otherUser ->
// TODO support lazily establishing mls group for team 1-1
oneOnOneResolver.resolveOneOnOneConversationWithUser(otherUser).flatMap {
oneOnOneResolver.resolveOneOnOneConversationWithUser(
user = otherUser,
invalidateCurrentKnownProtocols = true
).flatMap {
conversationRepository.getConversationById(it)?.let { Either.Right(it) } ?: Either.Left(StorageFailure.DataNotFound)
}
} ?: Either.Left(StorageFailure.DataNotFound))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ internal class NotifyConversationIsOpenUseCaseImpl(
kaliumLogger.v(
"Reevaluating protocol for 1:1 conversation with ID: ${conversationId.toLogString()}"
)
oneOnOneResolver.resolveOneOnOneConversationWithUser(conversation.otherUser)
oneOnOneResolver.resolveOneOnOneConversationWithUser(
user = conversation.otherUser,
invalidateCurrentKnownProtocols = true
)
}

// Delete Ephemeral Messages that has passed the end date
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,32 @@ import kotlin.time.Duration
interface OneOnOneResolver {
suspend fun resolveAllOneOnOneConversations(synchronizeUsers: Boolean = false): Either<CoreFailure, Unit>
suspend fun scheduleResolveOneOnOneConversationWithUserId(userId: UserId, delay: Duration = Duration.ZERO): Job
suspend fun resolveOneOnOneConversationWithUserId(userId: UserId): Either<CoreFailure, ConversationId>
suspend fun resolveOneOnOneConversationWithUser(user: OtherUser): Either<CoreFailure, ConversationId>

/**
* Resolves a one-on-one conversation with a user based on their userId.
*
* @param userId The userId of the other user in the conversation.
* @param invalidateCurrentKnownProtocols Flag indicating whether to whether it should attempt refreshing the other user's list of
* supported protocols by fetching from remote. In case of failure, the local result will be used as a fallback.
* @return Either a [CoreFailure] if there is an error or a [ConversationId] if the resolution is successful.
*/
suspend fun resolveOneOnOneConversationWithUserId(
userId: UserId,
invalidateCurrentKnownProtocols: Boolean,
): Either<CoreFailure, ConversationId>

/**
* Resolves a one-on-one conversation with a user.
*
* @param user The other user in the conversation.
* @param invalidateCurrentKnownProtocols Flag indicating whether to whether it should attempt refreshing the other user's list of
* supported protocols by fetching from remote. In case of failure, the local result will be used as a fallback.
* @return Either a [CoreFailure] if there is an error or a [ConversationId] if the resolution is successful.
*/
suspend fun resolveOneOnOneConversationWithUser(
user: OtherUser,
invalidateCurrentKnownProtocols: Boolean,
): Either<CoreFailure, ConversationId>
}

internal class OneOnOneResolverImpl(
Expand All @@ -62,52 +86,74 @@ internal class OneOnOneResolverImpl(

@OptIn(ExperimentalCoroutinesApi::class)
private val dispatcher = kaliumDispatcher.default.limitedParallelism(1)

// TODO: inherit the scope of UserSessionScope so it's cancelled if user logs out, etc.
private val resolveActiveOneOnOneScope = CoroutineScope(dispatcher)

override suspend fun resolveAllOneOnOneConversations(synchronizeUsers: Boolean): Either<CoreFailure, Unit> =
if (synchronizeUsers) {
userRepository.fetchAllOtherUsers()
} else {
Either.Right(Unit)
}.flatMap {
fetchAllOtherUsersIfNeeded(synchronizeUsers).flatMap {
val usersWithOneOnOne = userRepository.getUsersWithOneOnOneConversation()
kaliumLogger.i("Resolving one-on-one protocol for ${usersWithOneOnOne.size} user(s)")
usersWithOneOnOne.foldToEitherWhileRight(Unit) { item, _ ->
resolveOneOnOneConversationWithUser(item).flatMapLeft {
when (it) {
is CoreFailure.NoKeyPackagesAvailable,
is NetworkFailure.ServerMiscommunication,
is NetworkFailure.FederatedBackendFailure,
is CoreFailure.NoCommonProtocolFound
-> {
kaliumLogger.e("Resolving one-on-one failed $it, skipping")
Either.Right(Unit)
}

else -> {
kaliumLogger.e("Resolving one-on-one failed $it, retrying")
Either.Left(it)
}
}
resolveOneOnOneConversationWithUser(
user = item,
// Either it fetched all users on the previous step, or it's not needed
invalidateCurrentKnownProtocols = false
).flatMapLeft {
handleBatchEntryFailure(it)
}.map { }
}
}

private fun handleBatchEntryFailure(it: CoreFailure) = when (it) {
is CoreFailure.NoKeyPackagesAvailable,
is NetworkFailure.ServerMiscommunication,
is NetworkFailure.FederatedBackendFailure,
is CoreFailure.NoCommonProtocolFound
-> {
kaliumLogger.e("Resolving one-on-one failed $it, skipping")
Either.Right(Unit)
}

else -> {
kaliumLogger.e("Resolving one-on-one failed $it, retrying")
Either.Left(it)
}
}

private suspend fun fetchAllOtherUsersIfNeeded(synchronizeUsers: Boolean) = if (synchronizeUsers) {
userRepository.fetchAllOtherUsers()
} else {
Either.Right(Unit)
}

override suspend fun scheduleResolveOneOnOneConversationWithUserId(userId: UserId, delay: Duration) =
resolveActiveOneOnOneScope.launch {
kaliumLogger.d("Schedule resolving active one-on-one")
incrementalSyncRepository.incrementalSyncState.first { it is IncrementalSyncStatus.Live }
delay(delay)
resolveOneOnOneConversationWithUserId(userId)
resolveOneOnOneConversationWithUserId(
userId = userId,
invalidateCurrentKnownProtocols = true
)
}

override suspend fun resolveOneOnOneConversationWithUserId(userId: UserId): Either<CoreFailure, ConversationId> =
override suspend fun resolveOneOnOneConversationWithUserId(
userId: UserId,
invalidateCurrentKnownProtocols: Boolean
): Either<CoreFailure, ConversationId> =
userRepository.getKnownUser(userId).firstOrNull()?.let {
resolveOneOnOneConversationWithUser(it)
resolveOneOnOneConversationWithUser(it, invalidateCurrentKnownProtocols)
} ?: Either.Left(StorageFailure.DataNotFound)

override suspend fun resolveOneOnOneConversationWithUser(user: OtherUser): Either<CoreFailure, ConversationId> {
override suspend fun resolveOneOnOneConversationWithUser(
user: OtherUser,
invalidateCurrentKnownProtocols: Boolean,
): Either<CoreFailure, ConversationId> {
kaliumLogger.i("Resolving one-on-one protocol for ${user.id.toLogString()}")
if (invalidateCurrentKnownProtocols) {
userRepository.fetchUsersByIds(setOf(user.id))
}
return oneOnOneProtocolSelector.getProtocolForUser(user.id).flatMap { supportedProtocol ->
when (supportedProtocol) {
SupportedProtocol.PROTEUS -> oneOnOneMigrator.migrateToProteus(user)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ import kotlinx.datetime.Instant
/**
* Updates the supported protocols of the current user.
*/
interface UpdateSupportedProtocolsUseCase {
interface UpdateSelfUserSupportedProtocolsUseCase {
suspend operator fun invoke(): Either<CoreFailure, Boolean>
}

internal class UpdateSupportedProtocolsUseCaseImpl(
internal class UpdateSelfUserSupportedProtocolsUseCaseImpl(
private val clientsRepository: ClientRepository,
private val userRepository: UserRepository,
private val userConfigRepository: UserConfigRepository,
private val featureSupport: FeatureSupport
) : UpdateSupportedProtocolsUseCase {
) : UpdateSelfUserSupportedProtocolsUseCase {

override suspend operator fun invoke(): Either<CoreFailure, Boolean> {
return if (!featureSupport.isMLSSupported) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ interface UpdateSupportedProtocolsAndResolveOneOnOnesUseCase {
}

internal class UpdateSupportedProtocolsAndResolveOneOnOnesUseCaseImpl(
private val updateSupportedProtocols: UpdateSupportedProtocolsUseCase,
private val updateSupportedProtocols: UpdateSelfUserSupportedProtocolsUseCase,
private val oneOnOneResolver: OneOnOneResolver
) : UpdateSupportedProtocolsAndResolveOneOnOnesUseCase {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class UserScope internal constructor(
private val e2EIRepository: E2EIRepository,
private val mlsConversationRepository: MLSConversationRepository,
private val isSelfATeamMember: IsSelfATeamMemberUseCase,
private val updateSupportedProtocolsUseCase: UpdateSupportedProtocolsUseCase,
private val updateSelfUserSupportedProtocolsUseCase: UpdateSelfUserSupportedProtocolsUseCase,
) {
private val validateUserHandleUseCase: ValidateUserHandleUseCase get() = ValidateUserHandleUseCaseImpl()
val getSelfUser: GetSelfUserUseCase get() = GetSelfUserUseCaseImpl(userRepository)
Expand Down Expand Up @@ -179,7 +179,7 @@ class UserScope internal constructor(

val deleteAccount: DeleteAccountUseCase get() = DeleteAccountUseCase(accountRepository)

val updateSupportedProtocols: UpdateSupportedProtocolsUseCase get() = updateSupportedProtocolsUseCase
val updateSupportedProtocols: UpdateSelfUserSupportedProtocolsUseCase get() = updateSelfUserSupportedProtocolsUseCase

val observeCertificateRevocationForSelfClient: ObserveCertificateRevocationForSelfClientUseCase
get() = ObserveCertificateRevocationForSelfClientUseCaseImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ internal class MLSWelcomeEventHandlerImpl(
.first()
.flatMap {
if (it is ConversationDetails.OneOne) {
oneOnOneResolver.resolveOneOnOneConversationWithUser(it.otherUser).map { Unit }
oneOnOneResolver.resolveOneOnOneConversationWithUser(
user = it.otherUser,
invalidateCurrentKnownProtocols = true
).map { Unit }
} else {
Either.Right(Unit)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ internal class NewConversationEventHandlerImpl(
private suspend fun resolveConversationIfOneOnOne(selfUserTeamId: TeamId?, event: Event.Conversation.NewConversation) =
if (event.conversation.toConversationType(selfUserTeamId) == ConversationEntity.Type.ONE_ON_ONE) {
val otherUserId = event.conversation.members.otherMembers.first().id.toModel()
oneOnOneResolver.resolveOneOnOneConversationWithUserId(otherUserId).map { Unit }
oneOnOneResolver.resolveOneOnOneConversationWithUserId(
userId = otherUserId,
invalidateCurrentKnownProtocols = true
).map { Unit }
} else Either.Right(Unit)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import com.wire.kalium.logic.feature.legalhold.FetchLegalHoldForSelfUserFromRemo
import com.wire.kalium.logic.feature.team.SyncSelfTeamUseCase
import com.wire.kalium.logic.feature.user.SyncContactsUseCase
import com.wire.kalium.logic.feature.user.SyncSelfUserUseCase
import com.wire.kalium.logic.feature.user.UpdateSupportedProtocolsUseCase
import com.wire.kalium.logic.feature.user.UpdateSelfUserSupportedProtocolsUseCase
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.foldToEitherWhileRight
Expand Down Expand Up @@ -62,7 +62,7 @@ internal class SlowSyncWorkerImpl(
private val eventRepository: EventRepository,
private val syncSelfUser: SyncSelfUserUseCase,
private val syncFeatureConfigs: SyncFeatureConfigsUseCase,
private val updateSupportedProtocols: UpdateSupportedProtocolsUseCase,
private val updateSupportedProtocols: UpdateSelfUserSupportedProtocolsUseCase,
private val syncConversations: SyncConversationsUseCase,
private val syncConnections: SyncConnectionsUseCase,
private val syncSelfTeam: SyncSelfTeamUseCase,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class AcceptConnectionRequestUseCaseTest {
assertEquals(AcceptConnectionRequestUseCaseResult.Success, result)
verify(arrangement.oneOnOneResolver)
.suspendFunction(arrangement.oneOnOneResolver::resolveOneOnOneConversationWithUserId)
.with(eq(CONNECTION.qualifiedToId))
.with(eq(CONNECTION.qualifiedToId), eq(true))
.wasInvoked(once)
}

Expand Down
Loading

0 comments on commit ee75ec7

Please sign in to comment.