Skip to content

Commit

Permalink
Use single notification polling coroutine for all listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
vooft committed Jul 27, 2024
1 parent 1b95e5f commit 763cc1e
Showing 1 changed file with 83 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@ import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import io.github.vooft.kueue.IntegrationTest
import io.github.vooft.kueue.Kueue
import io.github.vooft.kueue.KueueConnectionProvider
import io.github.vooft.kueue.KueueTopic
import io.github.vooft.kueue.jdbc.JdbcKueueConnection
import io.github.vooft.kueue.jdbc.JdbcKueueConnectionPubSub
import io.github.vooft.kueue.jdbc.jdbc
import io.kotest.assertions.nondeterministic.eventually
import io.kotest.matchers.collections.shouldContainExactly
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.junit.jupiter.api.Test
import org.postgresql.core.BaseConnection
import java.sql.DriverManager
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import kotlin.time.Duration.Companion.seconds

class KueueManagerImplTest : IntegrationTest() {
Expand All @@ -30,9 +36,12 @@ class KueueManagerImplTest : IntegrationTest() {

val kueue = Kueue.jdbc(dataSource)
try {
val consumed = ConcurrentHashMap<KueueTopic, MutableList<String>>()
val mutex = Mutex()
val consumed = mutableMapOf<KueueTopic, MutableList<String>>()
val subscriptions = topics.map { topic ->
kueue.subscribe(topic) { consumed.computeIfAbsent(topic) { mutableListOf() }.add(it) }
kueue.subscribe(topic) {
mutex.withLock { consumed.computeIfAbsent(topic) { mutableListOf() }.add(it) }
}
}

val messagesPerTopic = 100
Expand All @@ -59,66 +68,75 @@ class KueueManagerImplTest : IntegrationTest() {
}
}

// @Test
// fun `should resubscribe after connection closure`(): Unit = runBlocking {
// val connection1 = DriverManager.getConnection(psql.jdbcUrl, psql.username, psql.password)
// val connection2 = DriverManager.getConnection(psql.jdbcUrl, psql.username, psql.password)
//
// val remainingConnections = mutableListOf(connection1, connection2)
// val connectionFactory = object : KueueConnectionFactory {
// override suspend fun create(): KueueConnection {
// val connection = remainingConnections.removeFirst()
// return JdbcKueueConnection(connection.unwrap(BaseConnection::class.java))
// }
// }
//
// val topics = List(10) { KueueTopic(UUID.randomUUID().toString()) }
//
// val kueueManager = KueueImpl(connectionFactory)
// try {
// val producers = topics.map { kueueManager.createProducer(it) }
//
// val mutex = Mutex()
// val consumed = mutableMapOf<KueueTopic, MutableList<String>>()
// val subscriptions = topics.map { topic ->
// kueueManager.subscribe(topic) {
// mutex.withLock {
// consumed.computeIfAbsent(topic) { mutableListOf() }.add(it)
// }
// }
// }
//
// val batch1 = topics.associateWith { List(10) { UUID.randomUUID().toString() } }
// val batch2 = topics.associateWith { List(10) { UUID.randomUUID().toString() } }
//
// producers.forEach { producer -> batch1.getValue(producer.topic).forEach { producer.send(it) } }
//
// eventually(5.seconds) {
// val currentConsumed = mutex.withLock { consumed.toMap() }
// for ((topic, messages) in currentConsumed) {
// messages shouldContainExactly batch1.getValue(topic)
// }
//
// delay(10)
// }
//
// consumed.clear()
// connection1.close()
//
// producers.forEach { producer -> batch2.getValue(producer.topic).forEach { producer.send(it) } }
// eventually(5.seconds) {
// val currentConsumed = mutex.withLock { consumed.toMap() }
// for ((topic, messages) in currentConsumed) {
// messages shouldContainExactly batch2.getValue(topic)
// }
//
// delay(10)
// }
//
// subscriptions.forEach { it.close() }
// } finally {
// kueueManager.close()
// connection2.close()
// }
// }
@Test
fun `should resubscribe after connection closure`(): Unit = runBlocking {
val connection1 = DriverManager.getConnection(psql.jdbcUrl, psql.username, psql.password)
val connection2 = DriverManager.getConnection(psql.jdbcUrl, psql.username, psql.password)

val remainingConnections = mutableListOf(connection1, connection2)
val connectionFactory = object : KueueConnectionProvider<BaseConnection, JdbcKueueConnection> {
override suspend fun create(): JdbcKueueConnection {
val connection = remainingConnections.removeFirst()
return JdbcKueueConnection(connection.unwrap(BaseConnection::class.java))
}

override suspend fun close(connection: JdbcKueueConnection) = Unit
}

val topics = List(10) { KueueTopic(UUID.randomUUID().toString()) }

val kueue = KueueImpl(connectionFactory, JdbcKueueConnectionPubSub())
try {
val mutex = Mutex()
val consumed = mutableMapOf<KueueTopic, MutableList<String>>()
val subscriptions = topics.map { topic ->
kueue.subscribe(topic) {
mutex.withLock {
consumed.computeIfAbsent(topic) { mutableListOf() }.add(it)
}
}
}

val batch1 = topics.associateWith { List(10) { UUID.randomUUID().toString() } }
val batch2 = topics.associateWith { List(10) { UUID.randomUUID().toString() } }

for ((topic, messages) in batch1) {
for (message in messages) {
kueue.send(topic, message)
}
}

eventually(5.seconds) {
val currentConsumed = mutex.withLock { consumed.toMap() }
for ((topic, messages) in currentConsumed) {
messages shouldContainExactly batch1.getValue(topic)
}

delay(10)
}

consumed.clear()
connection1.close()

for ((topic, messages) in batch2) {
for (message in messages) {
kueue.send(topic, message)
}
}

eventually(5.seconds) {
val currentConsumed = mutex.withLock { consumed.toMap() }
for ((topic, messages) in currentConsumed) {
messages shouldContainExactly batch2.getValue(topic)
}

delay(10)
}

subscriptions.forEach { it.close() }
} finally {
kueue.close()
connection2.close()
}
}
}

0 comments on commit 763cc1e

Please sign in to comment.