Skip to content

Commit

Permalink
Merge pull request #30 from bluegroundltd/feature/thread-priority
Browse files Browse the repository at this point in the history
feat: Allow specifying thread priority
  • Loading branch information
chris-asl authored Aug 23, 2024
2 parents 9c6b45f + 04e4da3 commit 45b7acf
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 6 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Transactional Outbox is published on `mavenCentral`. In order to use it just add

```gradle
implementation("io.github.bluegroundltd:transactional-outbox-core:2.0.4")
implementation("io.github.bluegroundltd:transactional-outbox-core:2.1.0")
```

Expand Down Expand Up @@ -55,6 +55,12 @@ class OutboxConfiguration(
return TransactionalOutboxBuilder
.make(clock)
.withHandlers(outboxHandlers)
// Ideally you want your thread pool < db connection pool size or similar
.withThreadPoolSize(5)
// Override the default Thread priority with caution
// If you are facing performance issues, you should
// adjust the thread pool size first.
// .withThreadPriority(Thread.NORM_PRIORITY)
.withMonitorLocksProvider(monitorLocksProvider)
.withCleanupLocksProvider(cleanupLocksProvider)
.withStore(outboxStore)
Expand Down
2 changes: 1 addition & 1 deletion core/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
GROUP=io.github.bluegroundltd
POM_ARTIFACT_ID=transactional-outbox-core
VERSION_NAME=2.0.4
VERSION_NAME=2.1.0

POM_NAME=Transactional Outbox Core
POM_DESCRIPTION=Easily implement the transactional outbox pattern in your JVM application
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class TransactionalOutboxBuilder(
BuildStep {
private val handlers: MutableMap<OutboxType, OutboxHandler> = mutableMapOf()
private var threadPoolSize: Int? = null
private var threadPriority: Int? = null
private var threadPoolTimeOut: Duration = DEFAULT_THREAD_POOL_TIMEOUT
private var decorators: MutableList<OutboxItemProcessorDecorator> = mutableListOf()
private lateinit var monitorLocksProvider: OutboxLocksProvider
Expand Down Expand Up @@ -132,6 +133,14 @@ class TransactionalOutboxBuilder(
return this
}

/**
* Sets the priority for the threads in the thread pool.
*/
override fun withThreadPriority(threadPriority: Int): BuildStep {
this.threadPriority = threadPriority
return this
}

/**
* Sets the thread pool timeout upon shutdown for the outbox.
*/
Expand All @@ -156,7 +165,7 @@ class TransactionalOutboxBuilder(
* Builds the outbox.
*/
override fun build(): TransactionalOutbox {
val executorServiceFactory = FixedThreadPoolExecutorServiceFactory(threadPoolSize)
val executorServiceFactory = FixedThreadPoolExecutorServiceFactory(threadPoolSize, threadPriority)
val outboxItemFactory = OutboxItemFactory(clock, handlers.toMap(), rerunAfterDuration)

return TransactionalOutboxImpl(
Expand Down Expand Up @@ -198,6 +207,7 @@ interface InstantOutboxPublisherStep {

interface BuildStep {
fun withThreadPoolSize(threadPoolSize: Int): BuildStep
fun withThreadPriority(threadPriority: Int): BuildStep
fun withThreadPoolTimeOut(threadPoolTimeOut: Duration): BuildStep
fun addProcessorDecorator(decorator: OutboxItemProcessorDecorator): BuildStep
fun build(): TransactionalOutbox
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import java.util.concurrent.Executors
*/
internal class FixedThreadPoolExecutorServiceFactory(
threadPoolSize: Int? = null,
threadPriority: Int? = null,
private val threadNameFormat: String = DEFAULT_THREAD_NAME_FORMAT
) {
private val threadPoolSize = threadPoolSize ?: DEFAULT_THREAD_POOL_SIZE
private val threadPriority = threadPriority ?: Thread.NORM_PRIORITY

companion object {
private const val DEFAULT_THREAD_POOL_SIZE = 10
Expand All @@ -25,7 +27,7 @@ internal class FixedThreadPoolExecutorServiceFactory(
logger.debug("Creating fixed thread pool with size: $threadPoolSize and name format \"$threadNameFormat\"")
return Executors.newFixedThreadPool(
threadPoolSize,
ThreadFactoryBuilder().setNameFormat(threadNameFormat).build()
ThreadFactoryBuilder().setNameFormat(threadNameFormat).setPriority(threadPriority).build()
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class TransactionalOutboxImplSpec extends Specification {
instantOutboxPublisher,
outboxItemFactory,
DURATION_ONE_HOUR,
new FixedThreadPoolExecutorServiceFactory(1, "").make(),
new FixedThreadPoolExecutorServiceFactory(1, null, "").make(),
[],
DURATION_ONE_NANO,
processingHostComposer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ class FixedThreadPoolExecutorServiceFactorySpec extends UnitTestSpecification {
def "Should create a FixedThreadPool ExecutorService with custom configuration"() {
given:
def expectedThreadPoolSize = 5
def expectedThreadPriority = Thread.MIN_PRIORITY

when:
def factory = new FixedThreadPoolExecutorServiceFactory(5, "")
def factory = new FixedThreadPoolExecutorServiceFactory(5, Thread.MIN_PRIORITY, "")

then:
def executorService = factory.make()
Expand All @@ -43,5 +44,6 @@ class FixedThreadPoolExecutorServiceFactorySpec extends UnitTestSpecification {
and:
threadPoolExecutorService.getCorePoolSize() == expectedThreadPoolSize
threadPoolExecutorService.getMaximumPoolSize() == expectedThreadPoolSize
threadPoolExecutorService.getThreadFactory().newThread({}).getPriority() == expectedThreadPriority
}
}

0 comments on commit 45b7acf

Please sign in to comment.