Skip to content

Commit

Permalink
Added ability to persist events
Browse files Browse the repository at this point in the history
  • Loading branch information
vooft committed Aug 4, 2024
1 parent a4005ed commit 04c0379
Show file tree
Hide file tree
Showing 13 changed files with 101 additions and 27 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ There is also an extension function for a specific library to simplify transacti
kueue.send(KueueTopic("my_topic"), "Hello, world!", transactionalConnection) // an extension function must be imported explicitly
```

## Persistence
There is an optional persistence layer that can be used to store messages in the database.

Migration script is located in:
* [pg-kueue-utils/src/testFixtures/resources/database/1_kueue_events.sql](pg-kueue-utils/src/testFixtures/resources/database/1_kueue_events.sql)

# Additional modules
## jOOQ JDBC
There is a module that accepts a jOOQ `DSLContext` and provides a similar interface to the JDBC module.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ interface Kueue<C, KC : KueueConnection<C>> {
}

@JvmInline
value class KueueTopic(val channel: String)
value class KueueTopic(val topic: String)
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,8 @@ interface KueueConnectionPubSub<KC : KueueConnection<*>> {
}
}

interface KueueEventPersister<KC : KueueConnection<*>> {
suspend fun persist(kueueConnection: KC, topic: KueueTopic, message: String)
}

data class KueueMessage(val topic: KueueTopic, val message: String)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.github.vooft.kueue.KueueConnection
import io.github.vooft.kueue.KueueConnectionProvider
import io.github.vooft.kueue.KueueConnectionPubSub
import io.github.vooft.kueue.KueueConnectionPubSub.ListenSubscription
import io.github.vooft.kueue.KueueEventPersister
import io.github.vooft.kueue.KueueMessage
import io.github.vooft.kueue.KueueTopic
import io.github.vooft.kueue.common.LoggerHolder
Expand All @@ -27,7 +28,8 @@ import kotlinx.coroutines.sync.withLock
@Suppress("detekt:UnusedPrivateProperty")
class KueueImpl<C, KC : KueueConnection<C>>(
private val connectionProvider: KueueConnectionProvider<C, KC>,
private val pubSub: KueueConnectionPubSub<KC>
private val pubSub: KueueConnectionPubSub<KC>,
private val persister: KueueEventPersister<KC>?
) : Kueue<C, KC> {

private val coroutineScope: CoroutineScope = CoroutineScope(SupervisorJob() + loggingExceptionHandler())
Expand All @@ -54,7 +56,10 @@ class KueueImpl<C, KC : KueueConnection<C>>(
override suspend fun send(topic: KueueTopic, message: String, kueueConnection: KC?) {
logger.debug { "Sending via $kueueConnection to $topic: $message" }
val connection = kueueConnection ?: getDefaultConnection()

persister?.persist(connection, topic, message)
pubSub.notify(connection, topic, message)

logger.debug { "Successfully sent via $kueueConnection to $topic: $message" }
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.github.vooft.kueue.jdbc

import io.github.vooft.kueue.common.withVirtualThreadDispatcher
import org.intellij.lang.annotations.Language
import org.postgresql.core.BaseConnection

internal suspend fun BaseConnection.execute(@Language("SQL") query: String) = withVirtualThreadDispatcher {
createStatement().use {
it.execute(query)
}
}

internal suspend fun <T> JdbcKueueConnection.useUnwrapped(block: suspend (BaseConnection) -> T): T =
block(jdbcConnection.unwrap(BaseConnection::class.java))
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.intellij.lang.annotations.Language
import org.postgresql.core.BaseConnection
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

Expand All @@ -33,7 +31,7 @@ class JdbcKueueConnectionPubSub(private val bufferSize: Int = 100, private val n
try {
kueueConnection.useUnwrapped { connection ->
withNonCancellable {
val escapedChannel = connection.escapeIdentifier(topic.channel)
val escapedChannel = connection.escapeIdentifier(topic.topic)
val escapedMessage = connection.escapeString(message)

logger.debug { "Executing query" }
Expand Down Expand Up @@ -71,15 +69,15 @@ class JdbcKueueConnectionPubSub(private val bufferSize: Int = 100, private val n
override suspend fun listen(topic: KueueTopic) {
if (mutex.withLock { listenedTopics.add(topic) }) {
kueueConnection.useUnwrapped { connection ->
val escapedChannel = connection.escapeIdentifier(topic.channel)
val escapedChannel = connection.escapeIdentifier(topic.topic)
connection.execute("LISTEN $escapedChannel")
}
}
}

override suspend fun unlisten(topic: KueueTopic) {
kueueConnection.useUnwrapped { connection ->
val escapedChannel = connection.escapeIdentifier(topic.channel)
val escapedChannel = connection.escapeIdentifier(topic.topic)
connection.execute("UNLISTEN $escapedChannel")
}
}
Expand All @@ -105,12 +103,3 @@ class JdbcKueueConnectionPubSub(private val bufferSize: Int = 100, private val n

companion object : LoggerHolder()
}

private suspend fun BaseConnection.execute(@Language("SQL") query: String) = withVirtualThreadDispatcher {
createStatement().use {
it.execute(query)
}
}

private suspend fun <T> JdbcKueueConnection.useUnwrapped(block: suspend (BaseConnection) -> T): T =
block(jdbcConnection.unwrap(BaseConnection::class.java))
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.github.vooft.kueue.jdbc

import io.github.vooft.kueue.KueueEventPersister
import io.github.vooft.kueue.KueueTopic
import java.time.OffsetDateTime
import java.time.ZoneOffset
import java.util.UUID

class JdbcKueueEventPersister : KueueEventPersister<JdbcKueueConnection> {
override suspend fun persist(kueueConnection: JdbcKueueConnection, topic: KueueTopic, message: String) {
kueueConnection.useUnwrapped { connection ->
connection.prepareStatement("INSERT INTO kueue_events (id, topic, message, created_at) VALUES (?, ?, ?, ?)").use {
it.setObject(1, UUID.randomUUID())
it.setString(2, topic.topic)
it.setString(3, message)
it.setObject(4, OffsetDateTime.now(ZoneOffset.UTC))

it.execute()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ import io.github.vooft.kueue.impl.KueueImpl
import java.sql.Connection
import javax.sql.DataSource

fun Kueue.Companion.jdbc(dataSource: DataSource): Kueue<Connection, JdbcKueueConnection> = KueueImpl(
fun Kueue.Companion.jdbc(dataSource: DataSource, persistEvents: Boolean): Kueue<Connection, JdbcKueueConnection> = KueueImpl(
connectionProvider = DataSourceKueueConnectionProvider(dataSource),
pubSub = JdbcKueueConnectionPubSub()
pubSub = JdbcKueueConnectionPubSub(),
persister = when (persistEvents) {
true -> JdbcKueueEventPersister()
false -> null
}
)

suspend fun Kueue<Connection, JdbcKueueConnection>.send(topic: KueueTopic, message: String, transactionalConnection: Connection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,23 @@ import io.github.vooft.kueue.impl.KueueImpl
import io.github.vooft.kueue.jdbc.DataSourceKueueConnectionProvider
import io.github.vooft.kueue.jdbc.JdbcKueueConnection
import io.github.vooft.kueue.jdbc.JdbcKueueConnectionPubSub
import io.github.vooft.kueue.jdbc.JdbcKueueEventPersister
import org.jooq.DSLContext
import java.sql.Connection
import javax.sql.DataSource

typealias JoodJdbcKueueConnection = JdbcKueueConnection
typealias JooqDataSourceKueueConnectionProvider = DataSourceKueueConnectionProvider
typealias JooqJdbcKueueConnectionPubSub = JdbcKueueConnectionPubSub
typealias JooqJdbcKueueEventPersister = JdbcKueueEventPersister

fun Kueue.Companion.jooq(dataSource: DataSource): Kueue<Connection, JoodJdbcKueueConnection> = KueueImpl(
fun Kueue.Companion.jooq(dataSource: DataSource, persistEvents: Boolean): Kueue<Connection, JoodJdbcKueueConnection> = KueueImpl(
connectionProvider = JooqDataSourceKueueConnectionProvider(dataSource),
pubSub = JooqJdbcKueueConnectionPubSub()
pubSub = JooqJdbcKueueConnectionPubSub(),
persister = when (persistEvents) {
true -> JooqJdbcKueueEventPersister()
false -> null
}
)

suspend fun Kueue<Connection, JoodJdbcKueueConnection>.send(topic: KueueTopic, message: String, transactionalDsl: DSLContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import io.github.vooft.kueue.KueueConnectionProvider
import io.github.vooft.kueue.KueueTopic
import io.github.vooft.kueue.jdbc.JdbcKueueConnection
import io.github.vooft.kueue.jdbc.JdbcKueueConnectionPubSub
import io.github.vooft.kueue.jdbc.JdbcKueueEventPersister
import io.github.vooft.kueue.jdbc.jdbc
import io.kotest.assertions.nondeterministic.continually
import io.kotest.assertions.nondeterministic.eventually
Expand All @@ -35,7 +36,7 @@ class JdbcTest : IntegrationTest() {
}
).use {
with(HappyPathTest) {
Kueue.jdbc(it).happyPathTest()
Kueue.jdbc(dataSource = it, persistEvents = true).happyPathTest()
}
}
}
Expand All @@ -59,7 +60,11 @@ class JdbcTest : IntegrationTest() {

val topics = List(10) { KueueTopic(UUID.randomUUID().toString()) }

val kueue = KueueImpl(connectionFactory, JdbcKueueConnectionPubSub())
val kueue = KueueImpl(
connectionProvider = connectionFactory,
pubSub = JdbcKueueConnectionPubSub(),
persister = JdbcKueueEventPersister()
)
try {
val mutex = Mutex()
val consumed = mutableMapOf<KueueTopic, MutableList<String>>()
Expand Down Expand Up @@ -126,7 +131,7 @@ class JdbcTest : IntegrationTest() {

val topic = KueueTopic(UUID.randomUUID().toString())

val kueue = Kueue.jdbc(dataSource)
val kueue = Kueue.jdbc(dataSource = dataSource, persistEvents = true)

try {
val mutex = Mutex()
Expand Down Expand Up @@ -209,7 +214,7 @@ class JdbcTest : IntegrationTest() {

val topic = KueueTopic(UUID.randomUUID().toString())

val kueue = Kueue.jdbc(dataSource)
val kueue = Kueue.jdbc(dataSource = dataSource, persistEvents = true)

try {
val mutex = Mutex()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class JooqTest : IntegrationTest() {
}
).use {
with(HappyPathTest) {
Kueue.jooq(it).happyPathTest()
Kueue.jooq(dataSource = it, persistEvents = true).happyPathTest()
}
}
}
Expand All @@ -49,7 +49,7 @@ class JooqTest : IntegrationTest() {

val topic = KueueTopic(UUID.randomUUID().toString())

val kueue = Kueue.jooq(dataSource)
val kueue = Kueue.jooq(dataSource = dataSource, persistEvents = true)

try {
val mutex = Mutex()
Expand Down Expand Up @@ -132,7 +132,7 @@ class JooqTest : IntegrationTest() {

val topic = KueueTopic(UUID.randomUUID().toString())

val kueue = Kueue.jooq(dataSource)
val kueue = Kueue.jooq(dataSource = dataSource, persistEvents = true)

try {
val mutex = Mutex()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ open class IntegrationTest {
withUsername("test")
withPassword("test")
start()

createConnection("").use { connection ->
for (migration in MIGRATIONS) {
connection.createStatement().use { statement ->
statement.execute(migration)
}
}
}
}
}
}

private const val KUEUE_EVENTS_MIGRATION = "/database/1_kueue_events.sql"
private val MIGRATIONS = listOf(KUEUE_EVENTS_MIGRATION).map { IntegrationTest::class.java.getResource(it).readText() }
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE TABLE kueue_events (
id UUID PRIMARY KEY,
topic TEXT NOT NULL,
message TEXT NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL
);

CREATE INDEX kueue_events_topic_created_at_idx ON kueue_events (topic, created_at);

0 comments on commit 04c0379

Please sign in to comment.