From c32c1f6abb5fbe90f4b3bfbe5cbb7f7ec8f2d753 Mon Sep 17 00:00:00 2001 From: rishtigupta Date: Wed, 13 Nov 2024 10:20:12 -0800 Subject: [PATCH] feat: add topic sequence page --- build.gradle.kts | 4 +-- .../internal/InternalTopicClient.android.kt | 33 ++++++++++++------ .../sdk/internal/InternalTopicClient.kt | 2 +- .../sdk/internal/InternalTopicClient.jvm.kt | 34 +++++++++++++------ 4 files changed, 48 insertions(+), 25 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index fb339f4..e7fe21a 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -63,7 +63,7 @@ kotlin { val jvmMain by getting { dependencies { implementation(kotlin("stdlib-jdk8")) - implementation("software.momento.kotlin:client-protos-jvm:0.114.0") + implementation("software.momento.kotlin:client-protos-jvm:0.119.2") runtimeOnly("io.grpc:grpc-netty:1.57.2") } } @@ -74,7 +74,7 @@ kotlin { } val androidMain by getting { dependencies { - implementation("software.momento.kotlin:client-protos-android:0.114.0") + implementation("software.momento.kotlin:client-protos-android:0.119.2") runtimeOnly("io.grpc:grpc-okhttp:1.57.2") } } diff --git a/src/androidMain/kotlin/software/momento/kotlin/sdk/internal/InternalTopicClient.android.kt b/src/androidMain/kotlin/software/momento/kotlin/sdk/internal/InternalTopicClient.android.kt index 826f93e..2643cc6 100644 --- a/src/androidMain/kotlin/software/momento/kotlin/sdk/internal/InternalTopicClient.android.kt +++ b/src/androidMain/kotlin/software/momento/kotlin/sdk/internal/InternalTopicClient.android.kt @@ -84,11 +84,11 @@ internal actual class InternalTopicClient actual constructor( } internal actual suspend fun subscribe( - cacheName: String, topicName: String, resumeAtTopicSequenceNumber: Long? + cacheName: String, topicName: String, resumeAtTopicSequenceNumber: Long?, resumeAtTopicSequencePage: Long? ): TopicSubscribeResponse { return runCatching { ValidationUtils.requireValidCacheName(cacheName) - sendSubscribe(cacheName, topicName, resumeAtTopicSequenceNumber) + sendSubscribe(cacheName, topicName, resumeAtTopicSequenceNumber, resumeAtTopicSequencePage) }.fold(onSuccess = { flow -> TopicSubscribeResponse.Subscription(flow) }, onFailure = { e -> @@ -105,16 +105,17 @@ internal actual class InternalTopicClient actual constructor( * the first message so it can throw an error, otherwise errors are converted into messages and emitted. */ private suspend fun sendSubscribe( - cacheName: String, topicName: String, resumeAtTopicSequenceNumber: Long? + cacheName: String, topicName: String, resumeAtTopicSequenceNumber: Long?, resumeAtTopicSequencePage: Long? ): Flow = flow { - var lastSequenceNumber: Long? = resumeAtTopicSequenceNumber + var lastSequenceNumber: Long? = resumeAtTopicSequenceNumber ?: 0 + var lastSequencePage: Long? = resumeAtTopicSequencePage ?: 0 var isFirstMessage = true var retry = true val metadata = metadataWithCache(cacheName) while (retry && currentCoroutineContext().isActive) { try { - val request = buildSubscriptionRequest(cacheName, topicName, lastSequenceNumber) + val request = buildSubscriptionRequest(cacheName, topicName, lastSequenceNumber, lastSequencePage) val subscriptionFlow = stubsManager.streamingStub.subscribe(request, metadata) subscriptionFlow.onEach { message -> if (isFirstMessage) { @@ -127,10 +128,11 @@ internal actual class InternalTopicClient actual constructor( } }.mapNotNull { message -> convertSubscriptionItem(message) - }.collect { (topicMessage, sequenceNumber) -> + }.collect { (topicMessage, sequenceNumber, sequencePage) -> if (!isFirstMessage) { topicMessage?.let { emit(it) } sequenceNumber?.let { lastSequenceNumber = it } + sequencePage?.let { lastSequencePage = it } } } } catch (e: Exception) { @@ -153,7 +155,7 @@ internal actual class InternalTopicClient actual constructor( } private fun buildSubscriptionRequest( - cacheName: String, topicName: String, lastSequenceNumber: Long? + cacheName: String, topicName: String, lastSequenceNumber: Long?, lastSequencePage: Long? ): _SubscriptionRequest { return _SubscriptionRequest.newBuilder().apply { this.cacheName = cacheName @@ -161,15 +163,24 @@ internal actual class InternalTopicClient actual constructor( if (lastSequenceNumber != null) { this.resumeAtTopicSequenceNumber = lastSequenceNumber } + if (lastSequencePage != null) { + this.sequencePage = lastSequencePage + } }.build() } - private data class ConvertedMessage(val message: TopicMessage?, val lastSequenceNumber: Long?) + private data class ConvertedMessage(val message: TopicMessage?, val lastSequenceNumber: Long?, val lastSequencePage: Long?) private fun convertSubscriptionItem(subscriptionItem: _SubscriptionItem): ConvertedMessage? { val lastSequenceNumber: Long? = when (subscriptionItem.kindCase) { _SubscriptionItem.KindCase.ITEM -> subscriptionItem.item.topicSequenceNumber - _SubscriptionItem.KindCase.DISCONTINUITY -> subscriptionItem.discontinuity.lastTopicSequence + _SubscriptionItem.KindCase.DISCONTINUITY -> subscriptionItem.discontinuity.newTopicSequence + else -> null + } + + val lastSequencePage: Long? = when (subscriptionItem.kindCase) { + _SubscriptionItem.KindCase.ITEM -> subscriptionItem.item.sequencePage + _SubscriptionItem.KindCase.DISCONTINUITY -> subscriptionItem.discontinuity.newSequencePage else -> null } @@ -189,8 +200,8 @@ internal actual class InternalTopicClient actual constructor( else -> null } - return if (topicMessage != null || lastSequenceNumber != null) { - ConvertedMessage(topicMessage, lastSequenceNumber) + return if (topicMessage != null || lastSequenceNumber != null || lastSequencePage != null) { + ConvertedMessage(topicMessage, lastSequenceNumber, lastSequencePage) } else { null } diff --git a/src/commonMain/kotlin/software/momento/kotlin/sdk/internal/InternalTopicClient.kt b/src/commonMain/kotlin/software/momento/kotlin/sdk/internal/InternalTopicClient.kt index 8df4372..b5dd0be 100644 --- a/src/commonMain/kotlin/software/momento/kotlin/sdk/internal/InternalTopicClient.kt +++ b/src/commonMain/kotlin/software/momento/kotlin/sdk/internal/InternalTopicClient.kt @@ -13,5 +13,5 @@ internal expect class InternalTopicClient(credentialProvider: CredentialProvider internal suspend fun publish(cacheName: String, topicName: String, value: ByteArray): TopicPublishResponse - internal suspend fun subscribe(cacheName: String, topicName: String, resumeAtTopicSequenceNumber: Long? = null): TopicSubscribeResponse + internal suspend fun subscribe(cacheName: String, topicName: String, resumeAtTopicSequenceNumber: Long? = null, resumeAtTopicSequencePage: Long? = null): TopicSubscribeResponse } diff --git a/src/jvmMain/kotlin/software/momento/kotlin/sdk/internal/InternalTopicClient.jvm.kt b/src/jvmMain/kotlin/software/momento/kotlin/sdk/internal/InternalTopicClient.jvm.kt index 1dfe4a9..80e607e 100644 --- a/src/jvmMain/kotlin/software/momento/kotlin/sdk/internal/InternalTopicClient.jvm.kt +++ b/src/jvmMain/kotlin/software/momento/kotlin/sdk/internal/InternalTopicClient.jvm.kt @@ -83,11 +83,11 @@ internal actual class InternalTopicClient actual constructor( } internal actual suspend fun subscribe( - cacheName: String, topicName: String, resumeAtTopicSequenceNumber: Long? + cacheName: String, topicName: String, resumeAtTopicSequenceNumber: Long?, resumeAtTopicSequencePage: Long? ): TopicSubscribeResponse { return runCatching { ValidationUtils.requireValidCacheName(cacheName) - sendSubscribe(cacheName, topicName, resumeAtTopicSequenceNumber) + sendSubscribe(cacheName, topicName, resumeAtTopicSequenceNumber, resumeAtTopicSequencePage) }.fold(onSuccess = { flow -> TopicSubscribeResponse.Subscription(flow) }, onFailure = { e -> @@ -105,16 +105,17 @@ internal actual class InternalTopicClient actual constructor( */ private suspend fun sendSubscribe( - cacheName: String, topicName: String, resumeAtTopicSequenceNumber: Long? + cacheName: String, topicName: String, resumeAtTopicSequenceNumber: Long?, resumeAtTopicSequencePage: Long? ): Flow = flow { - var lastSequenceNumber: Long? = resumeAtTopicSequenceNumber + var lastSequenceNumber: Long? = resumeAtTopicSequenceNumber ?: 0 + var lastSequencePage: Long? = resumeAtTopicSequencePage ?: 0 var isFirstMessage = true var retry = true val metadata = metadataWithCache(cacheName) while (retry && currentCoroutineContext().isActive) { try { - val request = buildSubscriptionRequest(cacheName, topicName, lastSequenceNumber) + val request = buildSubscriptionRequest(cacheName, topicName, lastSequenceNumber, lastSequencePage) val subscriptionFlow = stubsManager.streamingStub.subscribe(request, metadata) subscriptionFlow.onEach { message -> if (isFirstMessage) { @@ -125,12 +126,14 @@ internal actual class InternalTopicClient actual constructor( ) } } + println("Received message: $message") }.mapNotNull { message -> convertSubscriptionItem(message) - }.collect { (topicMessage, sequenceNumber) -> + }.collect { (topicMessage, sequenceNumber, sequencePage) -> if (!isFirstMessage) { topicMessage?.let { emit(it) } sequenceNumber?.let { lastSequenceNumber = it } + sequencePage?.let { lastSequencePage = it } } } } catch (e: Exception) { @@ -153,7 +156,7 @@ internal actual class InternalTopicClient actual constructor( } private fun buildSubscriptionRequest( - cacheName: String, topicName: String, lastSequenceNumber: Long? + cacheName: String, topicName: String, lastSequenceNumber: Long?, lastSequencePage: Long? ): _SubscriptionRequest { return _SubscriptionRequest.newBuilder().apply { this.cacheName = cacheName @@ -161,15 +164,24 @@ internal actual class InternalTopicClient actual constructor( if (lastSequenceNumber != null) { this.resumeAtTopicSequenceNumber = lastSequenceNumber } + if (lastSequencePage != null) { + this.sequencePage = lastSequencePage + } }.build() } - private data class ConvertedMessage(val message: TopicMessage?, val lastSequenceNumber: Long?) + private data class ConvertedMessage(val message: TopicMessage?, val lastSequenceNumber: Long?, val lastSequencePage: Long?) private fun convertSubscriptionItem(subscriptionItem: _SubscriptionItem): ConvertedMessage? { val lastSequenceNumber: Long? = when (subscriptionItem.kindCase) { _SubscriptionItem.KindCase.ITEM -> subscriptionItem.item.topicSequenceNumber - _SubscriptionItem.KindCase.DISCONTINUITY -> subscriptionItem.discontinuity.lastTopicSequence + _SubscriptionItem.KindCase.DISCONTINUITY -> subscriptionItem.discontinuity.newTopicSequence + else -> null + } + + val lastSequencePage: Long? = when (subscriptionItem.kindCase) { + _SubscriptionItem.KindCase.ITEM -> subscriptionItem.item.sequencePage + _SubscriptionItem.KindCase.DISCONTINUITY -> subscriptionItem.discontinuity.newSequencePage else -> null } @@ -189,8 +201,8 @@ internal actual class InternalTopicClient actual constructor( else -> null } - return if (topicMessage != null || lastSequenceNumber != null) { - ConvertedMessage(topicMessage, lastSequenceNumber) + return if (topicMessage != null || lastSequenceNumber != null || lastSequencePage != null) { + ConvertedMessage(topicMessage, lastSequenceNumber, lastSequencePage) } else { null }