From 0d16a9c995e640df9891aed2d0574173cbf61ae7 Mon Sep 17 00:00:00 2001 From: vooft <52610+vooft@users.noreply.github.com> Date: Sat, 21 Sep 2024 20:42:53 +0400 Subject: [PATCH] Add method to create topic --- .../io/github/vooft/kueue/log/KueueLog.kt | 1 + .../vooft/kueue/log/impl/KueueLogImpl.kt | 25 +++++++++++++++++++ .../vooft/kueue/persistence/KueuePersister.kt | 1 + 3 files changed, 27 insertions(+) diff --git a/pg-kueue-log/pg-kueue-log-core/src/main/kotlin/io/github/vooft/kueue/log/KueueLog.kt b/pg-kueue-log/pg-kueue-log-core/src/main/kotlin/io/github/vooft/kueue/log/KueueLog.kt index 896f3a7..6d61e49 100644 --- a/pg-kueue-log/pg-kueue-log-core/src/main/kotlin/io/github/vooft/kueue/log/KueueLog.kt +++ b/pg-kueue-log/pg-kueue-log-core/src/main/kotlin/io/github/vooft/kueue/log/KueueLog.kt @@ -5,6 +5,7 @@ import io.github.vooft.kueue.KueueTopic import io.github.vooft.kueue.persistence.KueueConsumerGroup interface KueueLog> { + suspend fun createTopic(topic: KueueTopic, partitions: Int) suspend fun createProducer(topic: KueueTopic): KueueProducer suspend fun createConsumer(topic: KueueTopic, group: KueueConsumerGroup): KueueConsumer diff --git a/pg-kueue-log/pg-kueue-log-core/src/main/kotlin/io/github/vooft/kueue/log/impl/KueueLogImpl.kt b/pg-kueue-log/pg-kueue-log-core/src/main/kotlin/io/github/vooft/kueue/log/impl/KueueLogImpl.kt index 71535fe..d42bb0f 100644 --- a/pg-kueue-log/pg-kueue-log-core/src/main/kotlin/io/github/vooft/kueue/log/impl/KueueLogImpl.kt +++ b/pg-kueue-log/pg-kueue-log-core/src/main/kotlin/io/github/vooft/kueue/log/impl/KueueLogImpl.kt @@ -9,15 +9,40 @@ import io.github.vooft.kueue.log.KueueProducer import io.github.vooft.kueue.log.impl.consumer.KueueConsumerDao import io.github.vooft.kueue.log.impl.consumer.KueueConsumerImpl import io.github.vooft.kueue.log.impl.producer.KueueProducerImpl +import io.github.vooft.kueue.log.impl.producer.withConnection import io.github.vooft.kueue.persistence.KueueConsumerGroup import io.github.vooft.kueue.persistence.KueueConsumerMessagePoller +import io.github.vooft.kueue.persistence.KueuePartitionIndex +import io.github.vooft.kueue.persistence.KueuePartitionOffset import io.github.vooft.kueue.persistence.KueuePersister +import io.github.vooft.kueue.persistence.KueueTopicModel +import io.github.vooft.kueue.persistence.KueueTopicPartitionModel +import java.time.Instant class KueueLogImpl>( private val connectionProvider: KueueConnectionProvider, private val persister: KueuePersister, private val poller: KueueConsumerMessagePoller ) : KueueLog { + + override suspend fun createTopic(topic: KueueTopic, partitions: Int) { + connectionProvider.withConnection { kc -> + persister.withTransaction(kc) { c -> + persister.upsert(KueueTopicModel(name = topic, partitions = partitions, createdAt = Instant.now()), c) + repeat(partitions) { + persister.upsert( + model = KueueTopicPartitionModel( + topic = topic, + partitionIndex = KueuePartitionIndex(it), + nextPartitionOffset = KueuePartitionOffset(1) + ), + connection = c + ) + } + } + } + } + override suspend fun createProducer(topic: KueueTopic): KueueProducer = KueueProducerImpl(topic = topic, connectionProvider = connectionProvider, persister = persister) diff --git a/pg-kueue-persistence/pg-kueue-persistence-core/src/main/kotlin/io/github/vooft/kueue/persistence/KueuePersister.kt b/pg-kueue-persistence/pg-kueue-persistence-core/src/main/kotlin/io/github/vooft/kueue/persistence/KueuePersister.kt index a02505c..b25a29d 100644 --- a/pg-kueue-persistence/pg-kueue-persistence-core/src/main/kotlin/io/github/vooft/kueue/persistence/KueuePersister.kt +++ b/pg-kueue-persistence/pg-kueue-persistence-core/src/main/kotlin/io/github/vooft/kueue/persistence/KueuePersister.kt @@ -37,6 +37,7 @@ interface KueuePersister> { suspend fun delete(model: KueueConnectedConsumerModel, connection: C) + // TODO: move to a better place suspend fun withTransaction(kueueConnection: KC, block: suspend (C) -> T): T }