This project aims to provide a simple Kotlin Coroutines-based interface for messaging, built on top of PostgreSQL, consists of two sub-projects:
pg-kueue-log
-- a library for emulating event store using PostgreSQL, similar to the Kafka Producer/Consumer model.pg-kueue-pubsub
-- a simple library for working with PostgreSQL LISTEN/NOTIFY.
This module provides 2 main classes: KueueProducer
and KueueConsumer
, which mirror the Kafka Producer/Consumer model.
At the moment it only works on top of the JDBC, however it should be possible to add other SPIs in future.
repositories {
mavenCentral()
}
dependencies {
implementation("io.github.vooft:pg-kueue-log-jdbc:<version>")
}
Simple consumer usage:
// dataSource must be provided externally
val dataSource = createMyDataSource()
val kueueLog = KueueLog.jdbc(dataSource)
// here consumer will try to subscribe to the topic and group, if it does not exist, it will be created
// topic must exist before creating a consumer
val consumer = kueueLog.createConsumer(KueueTopic("my-topic"), KueueConsumerGroup("my-group"))
// consumer.messages is an instance of ReceiveChannel and could be consumed using a for loop
for (message in consumer.messages) {
println(message)
}
Simple producer usage:
// dataSource must be provided externally
val dataSource = createMyDataSource()
val kueueLog = KueueLog.jdbc(dataSource)
// topic must exist before creating a producer
val producer = kueueLog.createProducer(KueueTopic("test"))
// key is mandatory and is used to partition messages
producer.produce(KueueKey("my-key"), KueueValue("my-value"))
It is possible to have multiple consumers in the same group and they will be balanced in the same was as Kafka consumers:
- One consumer will be chosen a leader (based on acquiring a lock in a table)
- Leader will be responsible for assigning partitions to other consumers
- Leader will be responsible for monitoring liveness of other consumers and rebalancing if consumer connects or disconnects
- Individual consumers will be responsible for committing offsets based on user's request
- Please note, if you don't commit the offset, after rebalancing the consumer will start consuming from the last committed offset
Kotlin Coroutines PostgresSQL-based message queue using LISTEN/NOTIFY
Everything is String-based and for now just follows normal LISTEN/NOTIFY rules.
repositories {
mavenCentral()
}
dependencies {
implementation("io.github.vooft:pg-kueue-jdbc:<version>")
}
Simple usage:
val dataSource = createMyDataSource()
val kueue = KueuePubSub.jdbc(dataSource)
val subscription = kueue.subscribe(KueueTopic("my_topic")) { message: String ->
println("Received message: $message")
}
kueue.publish(KueueTopic("my_topic"), "Hello, world!")
// will print after a tiny delay: "Received message: Hello, world!"
You can close subscription, if you would like to stop a particular listener:
subscription.close()
But it is not necessary if the the subscription should exist for the whole Kueue lifecycle.
All subscriptions will be closed automatically when Kueue is closed:
kueue.close()
To publish a message using existing transaction, you should provide the transactional connection.
Normally, API accepts a instance of a wrapped connection KueueConnection
, there is a helper method to create it:
val transactionalConnection = myBeginTransaction()
kueue.publish(KueueTopic("my_topic"), "Hello, world!", kueue.wrap(transactionalConnection))
There is also an extension function for a specific library to simplify transactional publishing:
val transactionalConnection = myBeginTransaction()
kueue.publish(KueueTopic("my_topic"), "Hello, world!", transactionalConnection) // an extension function must be imported explicitly