Skip to content

Commit

Permalink
Add method to create topic
Browse files Browse the repository at this point in the history
  • Loading branch information
vooft committed Sep 21, 2024
1 parent 1e3def3 commit 0d16a9c
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.github.vooft.kueue.KueueTopic
import io.github.vooft.kueue.persistence.KueueConsumerGroup

interface KueueLog<C, KC : KueueConnection<C>> {
suspend fun createTopic(topic: KueueTopic, partitions: Int)
suspend fun createProducer(topic: KueueTopic): KueueProducer<C, KC>
suspend fun createConsumer(topic: KueueTopic, group: KueueConsumerGroup): KueueConsumer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,40 @@ import io.github.vooft.kueue.log.KueueProducer
import io.github.vooft.kueue.log.impl.consumer.KueueConsumerDao
import io.github.vooft.kueue.log.impl.consumer.KueueConsumerImpl
import io.github.vooft.kueue.log.impl.producer.KueueProducerImpl
import io.github.vooft.kueue.log.impl.producer.withConnection
import io.github.vooft.kueue.persistence.KueueConsumerGroup
import io.github.vooft.kueue.persistence.KueueConsumerMessagePoller
import io.github.vooft.kueue.persistence.KueuePartitionIndex
import io.github.vooft.kueue.persistence.KueuePartitionOffset
import io.github.vooft.kueue.persistence.KueuePersister
import io.github.vooft.kueue.persistence.KueueTopicModel
import io.github.vooft.kueue.persistence.KueueTopicPartitionModel
import java.time.Instant

class KueueLogImpl<C, KC : KueueConnection<C>>(
private val connectionProvider: KueueConnectionProvider<C, KC>,
private val persister: KueuePersister<C, KC>,
private val poller: KueueConsumerMessagePoller
) : KueueLog<C, KC> {

override suspend fun createTopic(topic: KueueTopic, partitions: Int) {
connectionProvider.withConnection { kc ->
persister.withTransaction(kc) { c ->
persister.upsert(KueueTopicModel(name = topic, partitions = partitions, createdAt = Instant.now()), c)
repeat(partitions) {
persister.upsert(
model = KueueTopicPartitionModel(
topic = topic,
partitionIndex = KueuePartitionIndex(it),
nextPartitionOffset = KueuePartitionOffset(1)
),
connection = c
)
}
}
}
}

override suspend fun createProducer(topic: KueueTopic): KueueProducer<C, KC> =
KueueProducerImpl(topic = topic, connectionProvider = connectionProvider, persister = persister)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ interface KueuePersister<C, KC : KueueConnection<C>> {

suspend fun delete(model: KueueConnectedConsumerModel, connection: C)

// TODO: move to a better place
suspend fun <T> withTransaction(kueueConnection: KC, block: suspend (C) -> T): T
}

Expand Down

0 comments on commit 0d16a9c

Please sign in to comment.