@@ -48,8 +48,32 @@ import kotlin.time.Duration
48
48
interface OneOnOneResolver {
49
49
suspend fun resolveAllOneOnOneConversations (synchronizeUsers : Boolean = false): Either <CoreFailure , Unit >
50
50
suspend fun scheduleResolveOneOnOneConversationWithUserId (userId : UserId , delay : Duration = Duration .ZERO ): Job
51
- suspend fun resolveOneOnOneConversationWithUserId (userId : UserId ): Either <CoreFailure , ConversationId >
52
- suspend fun resolveOneOnOneConversationWithUser (user : OtherUser ): Either <CoreFailure , ConversationId >
51
+
52
+ /* *
53
+ * Resolves a one-on-one conversation with a user based on their userId.
54
+ *
55
+ * @param userId The userId of the other user in the conversation.
56
+ * @param invalidateCurrentKnownProtocols Flag indicating whether to whether it should attempt refreshing the other user's list of
57
+ * supported protocols by fetching from remote. In case of failure, the local result will be used as a fallback.
58
+ * @return Either a [CoreFailure] if there is an error or a [ConversationId] if the resolution is successful.
59
+ */
60
+ suspend fun resolveOneOnOneConversationWithUserId (
61
+ userId : UserId ,
62
+ invalidateCurrentKnownProtocols : Boolean ,
63
+ ): Either <CoreFailure , ConversationId >
64
+
65
+ /* *
66
+ * Resolves a one-on-one conversation with a user.
67
+ *
68
+ * @param user The other user in the conversation.
69
+ * @param invalidateCurrentKnownProtocols Flag indicating whether to whether it should attempt refreshing the other user's list of
70
+ * supported protocols by fetching from remote. In case of failure, the local result will be used as a fallback.
71
+ * @return Either a [CoreFailure] if there is an error or a [ConversationId] if the resolution is successful.
72
+ */
73
+ suspend fun resolveOneOnOneConversationWithUser (
74
+ user : OtherUser ,
75
+ invalidateCurrentKnownProtocols : Boolean ,
76
+ ): Either <CoreFailure , ConversationId >
53
77
}
54
78
55
79
internal class OneOnOneResolverImpl (
@@ -62,52 +86,74 @@ internal class OneOnOneResolverImpl(
62
86
63
87
@OptIn(ExperimentalCoroutinesApi ::class )
64
88
private val dispatcher = kaliumDispatcher.default.limitedParallelism(1 )
89
+
90
+ // TODO: inherit the scope of UserSessionScope so it's cancelled if user logs out, etc.
65
91
private val resolveActiveOneOnOneScope = CoroutineScope (dispatcher)
66
92
67
93
override suspend fun resolveAllOneOnOneConversations (synchronizeUsers : Boolean ): Either <CoreFailure , Unit > =
68
- if (synchronizeUsers) {
69
- userRepository.fetchAllOtherUsers()
70
- } else {
71
- Either .Right (Unit )
72
- }.flatMap {
94
+ fetchAllOtherUsersIfNeeded(synchronizeUsers).flatMap {
73
95
val usersWithOneOnOne = userRepository.getUsersWithOneOnOneConversation()
74
96
kaliumLogger.i(" Resolving one-on-one protocol for ${usersWithOneOnOne.size} user(s)" )
75
97
usersWithOneOnOne.foldToEitherWhileRight(Unit ) { item, _ ->
76
- resolveOneOnOneConversationWithUser(item).flatMapLeft {
77
- when (it) {
78
- is CoreFailure .NoKeyPackagesAvailable ,
79
- is NetworkFailure .ServerMiscommunication ,
80
- is NetworkFailure .FederatedBackendFailure ,
81
- is CoreFailure .NoCommonProtocolFound
82
- -> {
83
- kaliumLogger.e(" Resolving one-on-one failed $it , skipping" )
84
- Either .Right (Unit )
85
- }
86
-
87
- else -> {
88
- kaliumLogger.e(" Resolving one-on-one failed $it , retrying" )
89
- Either .Left (it)
90
- }
91
- }
98
+ resolveOneOnOneConversationWithUser(
99
+ user = item,
100
+ // Either it fetched all users on the previous step, or it's not needed
101
+ invalidateCurrentKnownProtocols = false
102
+ ).flatMapLeft {
103
+ handleBatchEntryFailure(it)
92
104
}.map { }
93
105
}
94
106
}
95
107
108
+ private fun handleBatchEntryFailure (it : CoreFailure ) = when (it) {
109
+ is CoreFailure .NoKeyPackagesAvailable ,
110
+ is NetworkFailure .ServerMiscommunication ,
111
+ is NetworkFailure .FederatedBackendFailure ,
112
+ is CoreFailure .NoCommonProtocolFound
113
+ -> {
114
+ kaliumLogger.e(" Resolving one-on-one failed $it , skipping" )
115
+ Either .Right (Unit )
116
+ }
117
+
118
+ else -> {
119
+ kaliumLogger.e(" Resolving one-on-one failed $it , retrying" )
120
+ Either .Left (it)
121
+ }
122
+ }
123
+
124
+ private suspend fun fetchAllOtherUsersIfNeeded (synchronizeUsers : Boolean ) = if (synchronizeUsers) {
125
+ userRepository.fetchAllOtherUsers()
126
+ } else {
127
+ Either .Right (Unit )
128
+ }
129
+
96
130
override suspend fun scheduleResolveOneOnOneConversationWithUserId (userId : UserId , delay : Duration ) =
97
131
resolveActiveOneOnOneScope.launch {
98
132
kaliumLogger.d(" Schedule resolving active one-on-one" )
99
133
incrementalSyncRepository.incrementalSyncState.first { it is IncrementalSyncStatus .Live }
100
134
delay(delay)
101
- resolveOneOnOneConversationWithUserId(userId)
135
+ resolveOneOnOneConversationWithUserId(
136
+ userId = userId,
137
+ invalidateCurrentKnownProtocols = true
138
+ )
102
139
}
103
140
104
- override suspend fun resolveOneOnOneConversationWithUserId (userId : UserId ): Either <CoreFailure , ConversationId > =
141
+ override suspend fun resolveOneOnOneConversationWithUserId (
142
+ userId : UserId ,
143
+ invalidateCurrentKnownProtocols : Boolean
144
+ ): Either <CoreFailure , ConversationId > =
105
145
userRepository.getKnownUser(userId).firstOrNull()?.let {
106
- resolveOneOnOneConversationWithUser(it)
146
+ resolveOneOnOneConversationWithUser(it, invalidateCurrentKnownProtocols )
107
147
} ? : Either .Left (StorageFailure .DataNotFound )
108
148
109
- override suspend fun resolveOneOnOneConversationWithUser (user : OtherUser ): Either <CoreFailure , ConversationId > {
149
+ override suspend fun resolveOneOnOneConversationWithUser (
150
+ user : OtherUser ,
151
+ invalidateCurrentKnownProtocols : Boolean ,
152
+ ): Either <CoreFailure , ConversationId > {
110
153
kaliumLogger.i(" Resolving one-on-one protocol for ${user.id.toLogString()} " )
154
+ if (invalidateCurrentKnownProtocols) {
155
+ userRepository.fetchUsersByIds(setOf (user.id))
156
+ }
111
157
return oneOnOneProtocolSelector.getProtocolForUser(user.id).flatMap { supportedProtocol ->
112
158
when (supportedProtocol) {
113
159
SupportedProtocol .PROTEUS -> oneOnOneMigrator.migrateToProteus(user)
0 commit comments