Skip to content

Commit

Permalink
feat: add topic sequence page
Browse files Browse the repository at this point in the history
  • Loading branch information
rishtigupta committed Nov 13, 2024
1 parent 5a103ac commit c32c1f6
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 25 deletions.
4 changes: 2 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ kotlin {
val jvmMain by getting {
dependencies {
implementation(kotlin("stdlib-jdk8"))
implementation("software.momento.kotlin:client-protos-jvm:0.114.0")
implementation("software.momento.kotlin:client-protos-jvm:0.119.2")
runtimeOnly("io.grpc:grpc-netty:1.57.2")
}
}
Expand All @@ -74,7 +74,7 @@ kotlin {
}
val androidMain by getting {
dependencies {
implementation("software.momento.kotlin:client-protos-android:0.114.0")
implementation("software.momento.kotlin:client-protos-android:0.119.2")
runtimeOnly("io.grpc:grpc-okhttp:1.57.2")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ internal actual class InternalTopicClient actual constructor(
}

internal actual suspend fun subscribe(
cacheName: String, topicName: String, resumeAtTopicSequenceNumber: Long?
cacheName: String, topicName: String, resumeAtTopicSequenceNumber: Long?, resumeAtTopicSequencePage: Long?
): TopicSubscribeResponse {
return runCatching {
ValidationUtils.requireValidCacheName(cacheName)
sendSubscribe(cacheName, topicName, resumeAtTopicSequenceNumber)
sendSubscribe(cacheName, topicName, resumeAtTopicSequenceNumber, resumeAtTopicSequencePage)
}.fold(onSuccess = { flow ->
TopicSubscribeResponse.Subscription(flow)
}, onFailure = { e ->
Expand All @@ -105,16 +105,17 @@ internal actual class InternalTopicClient actual constructor(
* the first message so it can throw an error, otherwise errors are converted into messages and emitted.
*/
private suspend fun sendSubscribe(
cacheName: String, topicName: String, resumeAtTopicSequenceNumber: Long?
cacheName: String, topicName: String, resumeAtTopicSequenceNumber: Long?, resumeAtTopicSequencePage: Long?
): Flow<TopicMessage> = flow {
var lastSequenceNumber: Long? = resumeAtTopicSequenceNumber
var lastSequenceNumber: Long? = resumeAtTopicSequenceNumber ?: 0
var lastSequencePage: Long? = resumeAtTopicSequencePage ?: 0
var isFirstMessage = true
var retry = true

val metadata = metadataWithCache(cacheName)
while (retry && currentCoroutineContext().isActive) {
try {
val request = buildSubscriptionRequest(cacheName, topicName, lastSequenceNumber)
val request = buildSubscriptionRequest(cacheName, topicName, lastSequenceNumber, lastSequencePage)
val subscriptionFlow = stubsManager.streamingStub.subscribe(request, metadata)
subscriptionFlow.onEach { message ->
if (isFirstMessage) {
Expand All @@ -127,10 +128,11 @@ internal actual class InternalTopicClient actual constructor(
}
}.mapNotNull { message ->
convertSubscriptionItem(message)
}.collect { (topicMessage, sequenceNumber) ->
}.collect { (topicMessage, sequenceNumber, sequencePage) ->
if (!isFirstMessage) {
topicMessage?.let { emit(it) }
sequenceNumber?.let { lastSequenceNumber = it }
sequencePage?.let { lastSequencePage = it }
}
}
} catch (e: Exception) {
Expand All @@ -153,23 +155,32 @@ internal actual class InternalTopicClient actual constructor(
}

private fun buildSubscriptionRequest(
cacheName: String, topicName: String, lastSequenceNumber: Long?
cacheName: String, topicName: String, lastSequenceNumber: Long?, lastSequencePage: Long?
): _SubscriptionRequest {
return _SubscriptionRequest.newBuilder().apply {
this.cacheName = cacheName
this.topic = topicName
if (lastSequenceNumber != null) {
this.resumeAtTopicSequenceNumber = lastSequenceNumber
}
if (lastSequencePage != null) {
this.sequencePage = lastSequencePage
}
}.build()
}

private data class ConvertedMessage(val message: TopicMessage?, val lastSequenceNumber: Long?)
private data class ConvertedMessage(val message: TopicMessage?, val lastSequenceNumber: Long?, val lastSequencePage: Long?)

private fun convertSubscriptionItem(subscriptionItem: _SubscriptionItem): ConvertedMessage? {
val lastSequenceNumber: Long? = when (subscriptionItem.kindCase) {
_SubscriptionItem.KindCase.ITEM -> subscriptionItem.item.topicSequenceNumber
_SubscriptionItem.KindCase.DISCONTINUITY -> subscriptionItem.discontinuity.lastTopicSequence
_SubscriptionItem.KindCase.DISCONTINUITY -> subscriptionItem.discontinuity.newTopicSequence
else -> null
}

val lastSequencePage: Long? = when (subscriptionItem.kindCase) {
_SubscriptionItem.KindCase.ITEM -> subscriptionItem.item.sequencePage
_SubscriptionItem.KindCase.DISCONTINUITY -> subscriptionItem.discontinuity.newSequencePage
else -> null
}

Expand All @@ -189,8 +200,8 @@ internal actual class InternalTopicClient actual constructor(
else -> null
}

return if (topicMessage != null || lastSequenceNumber != null) {
ConvertedMessage(topicMessage, lastSequenceNumber)
return if (topicMessage != null || lastSequenceNumber != null || lastSequencePage != null) {
ConvertedMessage(topicMessage, lastSequenceNumber, lastSequencePage)
} else {
null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ internal expect class InternalTopicClient(credentialProvider: CredentialProvider

internal suspend fun publish(cacheName: String, topicName: String, value: ByteArray): TopicPublishResponse

internal suspend fun subscribe(cacheName: String, topicName: String, resumeAtTopicSequenceNumber: Long? = null): TopicSubscribeResponse
internal suspend fun subscribe(cacheName: String, topicName: String, resumeAtTopicSequenceNumber: Long? = null, resumeAtTopicSequencePage: Long? = null): TopicSubscribeResponse
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ internal actual class InternalTopicClient actual constructor(
}

internal actual suspend fun subscribe(
cacheName: String, topicName: String, resumeAtTopicSequenceNumber: Long?
cacheName: String, topicName: String, resumeAtTopicSequenceNumber: Long?, resumeAtTopicSequencePage: Long?
): TopicSubscribeResponse {
return runCatching {
ValidationUtils.requireValidCacheName(cacheName)
sendSubscribe(cacheName, topicName, resumeAtTopicSequenceNumber)
sendSubscribe(cacheName, topicName, resumeAtTopicSequenceNumber, resumeAtTopicSequencePage)
}.fold(onSuccess = { flow ->
TopicSubscribeResponse.Subscription(flow)
}, onFailure = { e ->
Expand All @@ -105,16 +105,17 @@ internal actual class InternalTopicClient actual constructor(
*/

private suspend fun sendSubscribe(
cacheName: String, topicName: String, resumeAtTopicSequenceNumber: Long?
cacheName: String, topicName: String, resumeAtTopicSequenceNumber: Long?, resumeAtTopicSequencePage: Long?
): Flow<TopicMessage> = flow {
var lastSequenceNumber: Long? = resumeAtTopicSequenceNumber
var lastSequenceNumber: Long? = resumeAtTopicSequenceNumber ?: 0
var lastSequencePage: Long? = resumeAtTopicSequencePage ?: 0
var isFirstMessage = true
var retry = true

val metadata = metadataWithCache(cacheName)
while (retry && currentCoroutineContext().isActive) {
try {
val request = buildSubscriptionRequest(cacheName, topicName, lastSequenceNumber)
val request = buildSubscriptionRequest(cacheName, topicName, lastSequenceNumber, lastSequencePage)
val subscriptionFlow = stubsManager.streamingStub.subscribe(request, metadata)
subscriptionFlow.onEach { message ->
if (isFirstMessage) {
Expand All @@ -125,12 +126,14 @@ internal actual class InternalTopicClient actual constructor(
)
}
}
println("Received message: $message")
}.mapNotNull { message ->
convertSubscriptionItem(message)
}.collect { (topicMessage, sequenceNumber) ->
}.collect { (topicMessage, sequenceNumber, sequencePage) ->
if (!isFirstMessage) {
topicMessage?.let { emit(it) }
sequenceNumber?.let { lastSequenceNumber = it }
sequencePage?.let { lastSequencePage = it }
}
}
} catch (e: Exception) {
Expand All @@ -153,23 +156,32 @@ internal actual class InternalTopicClient actual constructor(
}

private fun buildSubscriptionRequest(
cacheName: String, topicName: String, lastSequenceNumber: Long?
cacheName: String, topicName: String, lastSequenceNumber: Long?, lastSequencePage: Long?
): _SubscriptionRequest {
return _SubscriptionRequest.newBuilder().apply {
this.cacheName = cacheName
this.topic = topicName
if (lastSequenceNumber != null) {
this.resumeAtTopicSequenceNumber = lastSequenceNumber
}
if (lastSequencePage != null) {
this.sequencePage = lastSequencePage
}
}.build()
}

private data class ConvertedMessage(val message: TopicMessage?, val lastSequenceNumber: Long?)
private data class ConvertedMessage(val message: TopicMessage?, val lastSequenceNumber: Long?, val lastSequencePage: Long?)

private fun convertSubscriptionItem(subscriptionItem: _SubscriptionItem): ConvertedMessage? {
val lastSequenceNumber: Long? = when (subscriptionItem.kindCase) {
_SubscriptionItem.KindCase.ITEM -> subscriptionItem.item.topicSequenceNumber
_SubscriptionItem.KindCase.DISCONTINUITY -> subscriptionItem.discontinuity.lastTopicSequence
_SubscriptionItem.KindCase.DISCONTINUITY -> subscriptionItem.discontinuity.newTopicSequence
else -> null
}

val lastSequencePage: Long? = when (subscriptionItem.kindCase) {
_SubscriptionItem.KindCase.ITEM -> subscriptionItem.item.sequencePage
_SubscriptionItem.KindCase.DISCONTINUITY -> subscriptionItem.discontinuity.newSequencePage
else -> null
}

Expand All @@ -189,8 +201,8 @@ internal actual class InternalTopicClient actual constructor(
else -> null
}

return if (topicMessage != null || lastSequenceNumber != null) {
ConvertedMessage(topicMessage, lastSequenceNumber)
return if (topicMessage != null || lastSequenceNumber != null || lastSequencePage != null) {
ConvertedMessage(topicMessage, lastSequenceNumber, lastSequencePage)
} else {
null
}
Expand Down

0 comments on commit c32c1f6

Please sign in to comment.