From 3a1a2480a125f6a84401dc136bc10894e8d5816d Mon Sep 17 00:00:00 2001 From: vooft <52610+vooft@users.noreply.github.com> Date: Sat, 27 Jul 2024 21:13:11 +0400 Subject: [PATCH] Add test for transactional publishing --- .../kotlin/io/github/vooft/kueue/Kueue.kt | 3 + .../io/github/vooft/kueue/KueueConnection.kt | 1 + .../io/github/vooft/kueue/impl/KueueImpl.kt | 2 + .../jdbc/DataSourceKueueConnectionProvider.kt | 3 + .../vooft/kueue/impl/KueueManagerImplTest.kt | 169 +++++++++++++++++- 5 files changed, 175 insertions(+), 3 deletions(-) diff --git a/pg-kueue-core/src/main/kotlin/io/github/vooft/kueue/Kueue.kt b/pg-kueue-core/src/main/kotlin/io/github/vooft/kueue/Kueue.kt index 0683c61..a69742d 100644 --- a/pg-kueue-core/src/main/kotlin/io/github/vooft/kueue/Kueue.kt +++ b/pg-kueue-core/src/main/kotlin/io/github/vooft/kueue/Kueue.kt @@ -1,6 +1,9 @@ package io.github.vooft.kueue interface Kueue> { + + suspend fun wrap(connection: C): KC + suspend fun send(topic: KueueTopic, message: String, kueueConnection: KC? = null) suspend fun subscribe(topic: KueueTopic, block: suspend (String) -> Unit): KueueSubscription diff --git a/pg-kueue-core/src/main/kotlin/io/github/vooft/kueue/KueueConnection.kt b/pg-kueue-core/src/main/kotlin/io/github/vooft/kueue/KueueConnection.kt index 002e5f5..543c183 100644 --- a/pg-kueue-core/src/main/kotlin/io/github/vooft/kueue/KueueConnection.kt +++ b/pg-kueue-core/src/main/kotlin/io/github/vooft/kueue/KueueConnection.kt @@ -34,6 +34,7 @@ abstract class SimpleKueueConnection(protected val connection: C) : KueueConn } interface KueueConnectionProvider> { + suspend fun wrap(connection: C): KC suspend fun create(): KC suspend fun close(connection: KC) } diff --git a/pg-kueue-core/src/main/kotlin/io/github/vooft/kueue/impl/KueueImpl.kt b/pg-kueue-core/src/main/kotlin/io/github/vooft/kueue/impl/KueueImpl.kt index a59eea8..ec4238c 100644 --- a/pg-kueue-core/src/main/kotlin/io/github/vooft/kueue/impl/KueueImpl.kt +++ b/pg-kueue-core/src/main/kotlin/io/github/vooft/kueue/impl/KueueImpl.kt @@ -49,6 +49,8 @@ class KueueImpl>( private val multiplexer = Channel() private val broadcaster = KueueMessageBroadcaster(multiplexer) + override suspend fun wrap(connection: C) = connectionProvider.wrap(connection) + override suspend fun send(topic: KueueTopic, message: String, kueueConnection: KC?) { logger.debug { "Sending via $kueueConnection to $topic: $message" } val connection = kueueConnection ?: getDefaultConnection() diff --git a/pg-kueue-jdbc/src/main/kotlin/io/github/vooft/kueue/jdbc/DataSourceKueueConnectionProvider.kt b/pg-kueue-jdbc/src/main/kotlin/io/github/vooft/kueue/jdbc/DataSourceKueueConnectionProvider.kt index 94b7569..ad7732e 100644 --- a/pg-kueue-jdbc/src/main/kotlin/io/github/vooft/kueue/jdbc/DataSourceKueueConnectionProvider.kt +++ b/pg-kueue-jdbc/src/main/kotlin/io/github/vooft/kueue/jdbc/DataSourceKueueConnectionProvider.kt @@ -6,6 +6,9 @@ import org.postgresql.core.BaseConnection import javax.sql.DataSource class DataSourceKueueConnectionProvider(private val dataSource: DataSource) : KueueConnectionProvider { + + override suspend fun wrap(connection: BaseConnection) = JdbcKueueConnection(jdbcConnection = connection) + override suspend fun create(): JdbcKueueConnection = withVirtualThreadDispatcher { val connection = dataSource.connection JdbcKueueConnection(jdbcConnection = connection.unwrap(BaseConnection::class.java)) diff --git a/pg-kueue-test/src/test/kotlin/io/github/vooft/kueue/impl/KueueManagerImplTest.kt b/pg-kueue-test/src/test/kotlin/io/github/vooft/kueue/impl/KueueManagerImplTest.kt index d0b11f4..9650172 100644 --- a/pg-kueue-test/src/test/kotlin/io/github/vooft/kueue/impl/KueueManagerImplTest.kt +++ b/pg-kueue-test/src/test/kotlin/io/github/vooft/kueue/impl/KueueManagerImplTest.kt @@ -9,6 +9,7 @@ import io.github.vooft.kueue.KueueTopic import io.github.vooft.kueue.jdbc.JdbcKueueConnection import io.github.vooft.kueue.jdbc.JdbcKueueConnectionPubSub import io.github.vooft.kueue.jdbc.jdbc +import io.kotest.assertions.nondeterministic.continually import io.kotest.assertions.nondeterministic.eventually import io.kotest.matchers.collections.shouldContainExactly import kotlinx.coroutines.delay @@ -54,7 +55,7 @@ class KueueManagerImplTest : IntegrationTest() { } } - eventually(5.seconds) { + eventually(1.seconds) { for ((topic, messages) in produced) { consumed[topic] shouldContainExactly messages } @@ -65,6 +66,7 @@ class KueueManagerImplTest : IntegrationTest() { subscriptions.forEach { it.close() } } finally { kueue.close() + dataSource.close() } } @@ -75,6 +77,8 @@ class KueueManagerImplTest : IntegrationTest() { val remainingConnections = mutableListOf(connection1, connection2) val connectionFactory = object : KueueConnectionProvider { + override suspend fun wrap(connection: BaseConnection) = JdbcKueueConnection(connection) + override suspend fun create(): JdbcKueueConnection { val connection = remainingConnections.removeFirst() return JdbcKueueConnection(connection.unwrap(BaseConnection::class.java)) @@ -106,7 +110,7 @@ class KueueManagerImplTest : IntegrationTest() { } } - eventually(5.seconds) { + eventually(1.seconds) { val currentConsumed = mutex.withLock { consumed.toMap() } for ((topic, messages) in currentConsumed) { messages shouldContainExactly batch1.getValue(topic) @@ -124,7 +128,7 @@ class KueueManagerImplTest : IntegrationTest() { } } - eventually(5.seconds) { + eventually(1.seconds) { val currentConsumed = mutex.withLock { consumed.toMap() } for ((topic, messages) in currentConsumed) { messages shouldContainExactly batch2.getValue(topic) @@ -139,4 +143,163 @@ class KueueManagerImplTest : IntegrationTest() { connection2.close() } } + + @Test + fun `should send after transaction commit`(): Unit = runBlocking { + val dataSource = HikariDataSource( + HikariConfig().apply { + jdbcUrl = psql.jdbcUrl + username = psql.username + password = psql.password + } + ) + + val topic = KueueTopic(UUID.randomUUID().toString()) + + val kueue = Kueue.jdbc(dataSource) + + try { + val mutex = Mutex() + val consumed = mutableListOf() + + val subscription = kueue.subscribe(topic) { + mutex.withLock { consumed.add(it) } + } + + val before = UUID.randomUUID().toString() + val during = UUID.randomUUID().toString() + val after = UUID.randomUUID().toString() + + val txn1 = UUID.randomUUID().toString() + val txn2 = UUID.randomUUID().toString() + + // send normal message + kueue.send(topic, before) + + eventually(1.seconds) { + mutex.withLock { consumed.toList() } shouldContainExactly listOf(before) + } + + // start transaction + val transactionConnection = dataSource.connection + transactionConnection.autoCommit = false + + val kueueTransactionConnection = kueue.wrap(transactionConnection.unwrap(BaseConnection::class.java)) + + // send transaction message 1 + kueue.send(topic, txn1, kueueTransactionConnection) + + // send normal message in-between transaction ones + kueue.send(topic, during) + + // no txn message added + eventually(1.seconds) { + mutex.withLock { consumed.toList() } shouldContainExactly listOf(before, during) + } + + // send second transaction message + kueue.send(topic, txn2, kueueTransactionConnection) + + // still no txn messages + mutex.withLock { consumed.toList() } shouldContainExactly listOf(before, during) + + // commit transaction + transactionConnection.commit() + + // both txn messages added + eventually(1.seconds) { + mutex.withLock { consumed.toList() } shouldContainExactly listOf(before, during, txn1, txn2) + } + + // send normal message after transaction over the transaction connection + transactionConnection.autoCommit = true + kueue.send(topic, after, kueueTransactionConnection) + + // all messages added + eventually(1.seconds) { + mutex.withLock { consumed.toList() } shouldContainExactly listOf(before, during, txn1, txn2, after) + } + + subscription.close() + } finally { + kueue.close() + dataSource.close() + } + } + + @Test + fun `should not send after transaction rollback`(): Unit = runBlocking { + val dataSource = HikariDataSource( + HikariConfig().apply { + jdbcUrl = psql.jdbcUrl + username = psql.username + password = psql.password + } + ) + + val topic = KueueTopic(UUID.randomUUID().toString()) + + val kueue = Kueue.jdbc(dataSource) + + try { + val mutex = Mutex() + val consumed = mutableListOf() + + val subscription = kueue.subscribe(topic) { + mutex.withLock { consumed.add(it) } + } + + val before = UUID.randomUUID().toString() + val during = UUID.randomUUID().toString() + val after = UUID.randomUUID().toString() + + val txn = UUID.randomUUID().toString() + + // send normal message + kueue.send(topic, before) + + eventually(1.seconds) { + mutex.withLock { consumed.toList() } shouldContainExactly listOf(before) + } + + // start transaction + val transactionConnection = dataSource.connection + transactionConnection.autoCommit = false + + val kueueTransactionConnection = kueue.wrap(transactionConnection.unwrap(BaseConnection::class.java)) + + // send transaction message + kueue.send(topic, txn, kueueTransactionConnection) + + // send normal message in-between transaction ones + kueue.send(topic, during) + + // no txn message added + eventually(1.seconds) { + mutex.withLock { consumed.toList() } shouldContainExactly listOf(before, during) + } + + // rollback transaction + transactionConnection.rollback() + + // still no txn messages + continually(1.seconds) { + mutex.withLock { consumed.toList() } shouldContainExactly listOf(before, during) + } + + // send normal message after transaction over the transaction connection + transactionConnection.autoCommit = true + kueue.send(topic, after, kueueTransactionConnection) + + // all messages added + eventually(1.seconds) { + mutex.withLock { consumed.toList() } shouldContainExactly listOf(before, during, after) + } + + subscription.close() + } finally { + kueue.close() + dataSource.close() + } + } }