Skip to content

Commit

Permalink
Add test for transactional publishing
Browse files Browse the repository at this point in the history
  • Loading branch information
vooft committed Jul 27, 2024
1 parent 763cc1e commit 3a1a248
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 3 deletions.
3 changes: 3 additions & 0 deletions pg-kueue-core/src/main/kotlin/io/github/vooft/kueue/Kueue.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package io.github.vooft.kueue

interface Kueue<C, KC : KueueConnection<C>> {

suspend fun wrap(connection: C): KC

suspend fun send(topic: KueueTopic, message: String, kueueConnection: KC? = null)
suspend fun subscribe(topic: KueueTopic, block: suspend (String) -> Unit): KueueSubscription

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ abstract class SimpleKueueConnection<C>(protected val connection: C) : KueueConn
}

interface KueueConnectionProvider<C, KC : KueueConnection<C>> {
suspend fun wrap(connection: C): KC
suspend fun create(): KC
suspend fun close(connection: KC)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class KueueImpl<C, KC : KueueConnection<C>>(
private val multiplexer = Channel<KueueMessage>()
private val broadcaster = KueueMessageBroadcaster(multiplexer)

override suspend fun wrap(connection: C) = connectionProvider.wrap(connection)

override suspend fun send(topic: KueueTopic, message: String, kueueConnection: KC?) {
logger.debug { "Sending via $kueueConnection to $topic: $message" }
val connection = kueueConnection ?: getDefaultConnection()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import org.postgresql.core.BaseConnection
import javax.sql.DataSource

class DataSourceKueueConnectionProvider(private val dataSource: DataSource) : KueueConnectionProvider<BaseConnection, JdbcKueueConnection> {

override suspend fun wrap(connection: BaseConnection) = JdbcKueueConnection(jdbcConnection = connection)

override suspend fun create(): JdbcKueueConnection = withVirtualThreadDispatcher {
val connection = dataSource.connection
JdbcKueueConnection(jdbcConnection = connection.unwrap(BaseConnection::class.java))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.github.vooft.kueue.KueueTopic
import io.github.vooft.kueue.jdbc.JdbcKueueConnection
import io.github.vooft.kueue.jdbc.JdbcKueueConnectionPubSub
import io.github.vooft.kueue.jdbc.jdbc
import io.kotest.assertions.nondeterministic.continually
import io.kotest.assertions.nondeterministic.eventually
import io.kotest.matchers.collections.shouldContainExactly
import kotlinx.coroutines.delay
Expand Down Expand Up @@ -54,7 +55,7 @@ class KueueManagerImplTest : IntegrationTest() {
}
}

eventually(5.seconds) {
eventually(1.seconds) {
for ((topic, messages) in produced) {
consumed[topic] shouldContainExactly messages
}
Expand All @@ -65,6 +66,7 @@ class KueueManagerImplTest : IntegrationTest() {
subscriptions.forEach { it.close() }
} finally {
kueue.close()
dataSource.close()
}
}

Expand All @@ -75,6 +77,8 @@ class KueueManagerImplTest : IntegrationTest() {

val remainingConnections = mutableListOf(connection1, connection2)
val connectionFactory = object : KueueConnectionProvider<BaseConnection, JdbcKueueConnection> {
override suspend fun wrap(connection: BaseConnection) = JdbcKueueConnection(connection)

override suspend fun create(): JdbcKueueConnection {
val connection = remainingConnections.removeFirst()
return JdbcKueueConnection(connection.unwrap(BaseConnection::class.java))
Expand Down Expand Up @@ -106,7 +110,7 @@ class KueueManagerImplTest : IntegrationTest() {
}
}

eventually(5.seconds) {
eventually(1.seconds) {
val currentConsumed = mutex.withLock { consumed.toMap() }
for ((topic, messages) in currentConsumed) {
messages shouldContainExactly batch1.getValue(topic)
Expand All @@ -124,7 +128,7 @@ class KueueManagerImplTest : IntegrationTest() {
}
}

eventually(5.seconds) {
eventually(1.seconds) {
val currentConsumed = mutex.withLock { consumed.toMap() }
for ((topic, messages) in currentConsumed) {
messages shouldContainExactly batch2.getValue(topic)
Expand All @@ -139,4 +143,163 @@ class KueueManagerImplTest : IntegrationTest() {
connection2.close()
}
}

@Test
fun `should send after transaction commit`(): Unit = runBlocking {
val dataSource = HikariDataSource(
HikariConfig().apply {
jdbcUrl = psql.jdbcUrl
username = psql.username
password = psql.password
}
)

val topic = KueueTopic(UUID.randomUUID().toString())

val kueue = Kueue.jdbc(dataSource)

try {
val mutex = Mutex()
val consumed = mutableListOf<String>()

val subscription = kueue.subscribe(topic) {
mutex.withLock { consumed.add(it) }
}

val before = UUID.randomUUID().toString()
val during = UUID.randomUUID().toString()
val after = UUID.randomUUID().toString()

val txn1 = UUID.randomUUID().toString()
val txn2 = UUID.randomUUID().toString()

// send normal message
kueue.send(topic, before)

eventually(1.seconds) {
mutex.withLock { consumed.toList() } shouldContainExactly listOf(before)
}

// start transaction
val transactionConnection = dataSource.connection
transactionConnection.autoCommit = false

val kueueTransactionConnection = kueue.wrap(transactionConnection.unwrap(BaseConnection::class.java))

// send transaction message 1
kueue.send(topic, txn1, kueueTransactionConnection)

// send normal message in-between transaction ones
kueue.send(topic, during)

// no txn message added
eventually(1.seconds) {
mutex.withLock { consumed.toList() } shouldContainExactly listOf(before, during)
}

// send second transaction message
kueue.send(topic, txn2, kueueTransactionConnection)

// still no txn messages
mutex.withLock { consumed.toList() } shouldContainExactly listOf(before, during)

// commit transaction
transactionConnection.commit()

// both txn messages added
eventually(1.seconds) {
mutex.withLock { consumed.toList() } shouldContainExactly listOf(before, during, txn1, txn2)
}

// send normal message after transaction over the transaction connection
transactionConnection.autoCommit = true
kueue.send(topic, after, kueueTransactionConnection)

// all messages added
eventually(1.seconds) {
mutex.withLock { consumed.toList() } shouldContainExactly listOf(before, during, txn1, txn2, after)
}

subscription.close()
} finally {
kueue.close()
dataSource.close()
}
}

@Test
fun `should not send after transaction rollback`(): Unit = runBlocking {
val dataSource = HikariDataSource(
HikariConfig().apply {
jdbcUrl = psql.jdbcUrl
username = psql.username
password = psql.password
}
)

val topic = KueueTopic(UUID.randomUUID().toString())

val kueue = Kueue.jdbc(dataSource)

try {
val mutex = Mutex()
val consumed = mutableListOf<String>()

val subscription = kueue.subscribe(topic) {
mutex.withLock { consumed.add(it) }
}

val before = UUID.randomUUID().toString()
val during = UUID.randomUUID().toString()
val after = UUID.randomUUID().toString()

val txn = UUID.randomUUID().toString()

// send normal message
kueue.send(topic, before)

eventually(1.seconds) {
mutex.withLock { consumed.toList() } shouldContainExactly listOf(before)
}

// start transaction
val transactionConnection = dataSource.connection
transactionConnection.autoCommit = false

val kueueTransactionConnection = kueue.wrap(transactionConnection.unwrap(BaseConnection::class.java))

// send transaction message
kueue.send(topic, txn, kueueTransactionConnection)

// send normal message in-between transaction ones
kueue.send(topic, during)

// no txn message added
eventually(1.seconds) {
mutex.withLock { consumed.toList() } shouldContainExactly listOf(before, during)
}

// rollback transaction
transactionConnection.rollback()

// still no txn messages
continually(1.seconds) {
mutex.withLock { consumed.toList() } shouldContainExactly listOf(before, during)
}

// send normal message after transaction over the transaction connection
transactionConnection.autoCommit = true
kueue.send(topic, after, kueueTransactionConnection)

// all messages added
eventually(1.seconds) {
mutex.withLock { consumed.toList() } shouldContainExactly listOf(before, during, after)
}

subscription.close()
} finally {
kueue.close()
dataSource.close()
}
}
}

0 comments on commit 3a1a248

Please sign in to comment.