Skip to content

Commit

Permalink
Merge pull request #23 from bluegroundltd/feat/add-locking-in-cleanup
Browse files Browse the repository at this point in the history
feat: add locking in cleanup
  • Loading branch information
chris-asl authored Jun 19, 2024
2 parents 5f2f505 + fba98cb commit cb983f7
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 53 deletions.
17 changes: 11 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Transactional Outbox is a library that provides a simple way to implement
the [Transactional Outbox Pattern](https://microservices.io/patterns/data/transactional-outbox.html) in your
application, developed by Blueground.

Api Docs: https://bluegroundltd.github.io/transactional-outbox/
API Docs: https://bluegroundltd.github.io/transactional-outbox/

## Table of Contents

Expand All @@ -27,7 +27,7 @@ Transactional Outbox is published on `mavenCentral`. In order to use it just add

```gradle
implementation("io.github.bluegroundltd:transactional-outbox-core:1.0.0")
implementation("io.github.bluegroundltd:transactional-outbox-core:2.0.0")
```

Expand All @@ -49,18 +49,20 @@ class OutboxConfiguration(

@Bean
fun transactionalOutbox(): TransactionalOutbox {
val locksProvider = OutboxLocksProvider(postgresLockDao, APPLICATION_SPECIFIC_ID)
val monitorLocksProvider = OutboxLocksProvider(postgresLockDao, MONITOR_APPLICATION_SPECIFIC_ID)
val cleanupLocksProvider = OutboxLocksProvider(postgresLockDao, CLEANUP_APPLICATION_SPECIFIC_ID)

return TransactionalOutboxBuilder
.make(clock)
.withHandlers(outboxHandlers)
.withLocksProvider(locksProvider)
.withMonitorLocksProvider(monitorLocksProvider)
.withCleanupLocksProvider(cleanupLocksProvider)
.withStore(outboxStore)
.build()
}
}

private class OutboxLocksProvider(
private class OutboxLocksProviderImpl(
private val postgresLockDao: PostgresLockDao,
private val id: Long
) : OutboxLocksProvider {
Expand All @@ -76,7 +78,10 @@ private class OutboxLocksProvider(

### Creating a new Outbox Handler

Then you can create a new `OutboxHandler` that will be responsible for processing the `Outbox` entries:
Then you can create a new `OutboxHandler` that will be responsible for processing the `Outbox` entries.
Below you can see a barebones handler, but there's also a utility handler, which uses JSON (de)serialization and
reduces the outbox handlers boilerplate code. Refer to [SimpleOutboxHandler](https://bluegroundltd.github.io/transactional-outbox/core/io.github.bluegroundltd.outbox/-simple-outbox-handler/index.html) in our [docs page](
https://bluegroundltd.github.io/transactional-outbox/index.html).

```kotlin
enum class MyOutboxType: OutboxType {
Expand Down
55 changes: 55 additions & 0 deletions UPGRADING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Upgrade Guide

## v.2.x.x

### v.2.0.0 - Completed outbox items cleanup coordination

Release 2.0.0 introduces coordination in the cleanup process across instances using locks.

#### `TransactionalOutboxBuilder#withLocksProvider` has been removed in favor of `withMonitorLocksProvider`
The reason for this breaking change was the need to introduce a different locks provider for the completed outbox items cleanup process.
We could reuse the same locks provider for both the monitor and cleanup process, but this would entail serialized execution of the two processes, which was not desirable.
The requirement for the new locks provider is solely to be independent of the locks provider used in the monitor process.
For example, if the locks provider implementation is using Postgres advisory locks, the monitor and the cleanup locks should use a different lock identifier.

**Required changes**
The `TransactionalOutboxBuilder` call needs to be updated from
```kotlin
TransactionalOutboxBuilder
.make(clock)
.withHandlers(outboxHandlers)
.withLocksProvider(locksProvider)
.withStore(outboxStore)
.build()
```
to
```kotlin
TransactionalOutboxBuilder
.make(clock)
.withHandlers(outboxHandlers)
.withMonitorLocksProvider(PostgresOutboxLocksProvider(LOCKS_MONITOR_ID))
.withCleanupLocksProvider(PostgresOutboxLocksProvider(LOCKS_CLEANUP_ID))
.withStore(outboxStore)
.build()
```
N.B.: The above assumes that the locks provider implementation is using Postgres advisory locks.

## v.1.x.x

### v.1.0.0 - Completed outbox items cleanup

Release 1.0 introduces a cleanup process for the outbox items that have been successfully processed, thus reducing the size of the outbox table, which can grow quite large.
When the outbox items are processes successfully, in addition to be marked as completed, their `OutboxItem.deleteAfter` field is set to `now() + retentionPeriod`.
The cleanup process, like monitor, should be run periodically, depending on your needs. Once run, it deletes the completed
outbox items whose `deleteAfter` is earlier than the current time.

It is advisable to manually delete the already completed outbox items before upgrading to 1.0.0, as the cleanup process
will issue a deletion, which may be quite heavy in terms of I/O operations, hence timeouts may occur on the first run.

**Required changes**
In the `OutboxStore` implementing class, the `deleteCompletedItems(now: Instant)` method needs to be implemented.
The method should simply delete the outbox items with status `COMPLETED` with a `deleteAfter` earlier than the provided `now` parameter.

Finally, the retention duration period can be defined per outbox handler for flexibility.
A new `OutboxHandler` method has been added `getRetentionDuration(): Duration` which should return the retention period for the outbox items of the handler.
Feel free to look the [SimpleOutboxHandler](./core/src/main/kotlin/io/github/bluegroundltd/outbox/SimpleOutboxHandler.kt) for an example.
2 changes: 1 addition & 1 deletion core/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
GROUP=io.github.bluegroundltd
POM_ARTIFACT_ID=transactional-outbox-core
VERSION_NAME=1.0.0
VERSION_NAME=2.0.0

POM_NAME=Transactional Outbox Core
POM_DESCRIPTION=Easily implement the transactional outbox pattern in your JVM application
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import kotlin.properties.Delegates
* return TransactionalOutboxBuilder
* .make(clock)
* .withHandlers(outboxHandlers)
* .withLocksProvider(locksProvider)
* .withMonitorLocksProvider(monitorLocksProvider)
* .withCleanupLocksProvider(cleanupLocksProvider)
* .withStore(outboxStore)
* .withInstantOutboxPublisher(instantOutboxPublisher)
* .build()
Expand All @@ -28,12 +29,18 @@ import kotlin.properties.Delegates
class TransactionalOutboxBuilder(
private val clock: Clock,
private val rerunAfterDuration: Duration = DEFAULT_RERUN_AFTER_DURATION
) : OutboxHandlersStep, LocksProviderStep, StoreStep, InstantOutboxPublisherStep, BuildStep {
) : OutboxHandlersStep,
MonitorLocksProviderStep,
CleanupLocksProviderStep,
StoreStep,
InstantOutboxPublisherStep,
BuildStep {
private val handlers: MutableMap<OutboxType, OutboxHandler> = mutableMapOf()
private var threadPoolSize by Delegates.notNull<Int>()
private var threadPoolTimeOut: Duration = DEFAULT_THREAD_POOL_TIMEOUT
private var decorators: MutableList<OutboxItemProcessorDecorator> = mutableListOf()
private lateinit var locksProvider: OutboxLocksProvider
private lateinit var monitorLocksProvider: OutboxLocksProvider
private lateinit var cleanupLocksProvider: OutboxLocksProvider
private lateinit var store: OutboxStore
private lateinit var instantOutboxPublisher: InstantOutboxPublisher

Expand All @@ -53,7 +60,7 @@ class TransactionalOutboxBuilder(
/**
* Sets the handlers for the outbox.
*/
override fun withHandlers(handlers: Set<OutboxHandler>): LocksProviderStep {
override fun withHandlers(handlers: Set<OutboxHandler>): MonitorLocksProviderStep {
validateNoDuplicateHandlerSupportedTypes(handlers)
handlers.associateByTo(this.handlers) { it.getSupportedType() }
return this
Expand Down Expand Up @@ -87,10 +94,18 @@ class TransactionalOutboxBuilder(
}

/**
* Sets the locks provider for the outbox.
* Sets the locks provider for the outbox monitor runs.
*/
override fun withLocksProvider(locksProvider: OutboxLocksProvider): StoreStep {
this.locksProvider = locksProvider
override fun withMonitorLocksProvider(locksProvider: OutboxLocksProvider): CleanupLocksProviderStep {
this.monitorLocksProvider = locksProvider
return this
}

/**
* Sets the locks provider for the outbox cleanup runs.
*/
override fun withCleanupLocksProvider(locksProvider: OutboxLocksProvider): StoreStep {
this.cleanupLocksProvider = locksProvider
return this
}

Expand Down Expand Up @@ -148,7 +163,8 @@ class TransactionalOutboxBuilder(
return TransactionalOutboxImpl(
clock,
handlers.toMap(),
locksProvider,
monitorLocksProvider,
cleanupLocksProvider,
store,
instantOutboxPublisher,
outboxItemFactory,
Expand All @@ -161,11 +177,15 @@ class TransactionalOutboxBuilder(
}

interface OutboxHandlersStep {
fun withHandlers(handlers: Set<OutboxHandler>): LocksProviderStep
fun withHandlers(handlers: Set<OutboxHandler>): MonitorLocksProviderStep
}

interface MonitorLocksProviderStep {
fun withMonitorLocksProvider(locksProvider: OutboxLocksProvider): CleanupLocksProviderStep
}

interface LocksProviderStep {
fun withLocksProvider(locksProvider: OutboxLocksProvider): StoreStep
interface CleanupLocksProviderStep {
fun withCleanupLocksProvider(locksProvider: OutboxLocksProvider): StoreStep
}

interface StoreStep {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean

@SuppressWarnings("LongParameterList")
@SuppressWarnings("LongParameterList", "TooGenericExceptionCaught")
internal class TransactionalOutboxImpl(
private val clock: Clock,
private val outboxHandlers: Map<OutboxType, OutboxHandler>,
private val locksProvider: OutboxLocksProvider,
private val monitorLocksProvider: OutboxLocksProvider,
private val cleanupLocksProvider: OutboxLocksProvider,
private val outboxStore: OutboxStore,
private val instantOutboxPublisher: InstantOutboxPublisher,
private val outboxItemFactory: OutboxItemFactory,
Expand Down Expand Up @@ -66,12 +67,12 @@ internal class TransactionalOutboxImpl(

override fun monitor() {
if (inShutdownMode.get()) {
logger.info("$LOGGER_PREFIX Shutdown in process - no longer accepting items for processing")
logger.info("$LOGGER_PREFIX Shutdown in process, no longer accepting items for processing")
return
}

runCatching {
locksProvider.acquire()
monitorLocksProvider.acquire()

val items = fetchEligibleItems()
if (items.isEmpty()) {
Expand All @@ -87,8 +88,8 @@ internal class TransactionalOutboxImpl(
logger.error("$LOGGER_PREFIX Failure in monitor", it)
}

kotlin.runCatching { locksProvider.release() }.onFailure {
logger.error("$LOGGER_PREFIX Failed to release lock of $locksProvider", it)
runCatching { monitorLocksProvider.release() }.onFailure {
logger.error("$LOGGER_PREFIX Failed to release lock of $monitorLocksProvider", it)
}
}

Expand Down Expand Up @@ -172,7 +173,29 @@ internal class TransactionalOutboxImpl(
}

override fun cleanup() {
logger.info("$LOGGER_PREFIX Cleaning up outbox items")
outboxStore.deleteCompletedItems(Instant.now(clock))
if (inShutdownMode.get()) {
logger.info("$LOGGER_PREFIX Shutdown in process, deferring cleanup")
return
}

var wasLockingAcquired = false
try {
cleanupLocksProvider.acquire()
wasLockingAcquired = true

val now = Instant.now(clock)
logger.info("$LOGGER_PREFIX Cleaning up completed outbox items, with deleteAfter <= $now")
outboxStore.deleteCompletedItems(now)
} catch (exception: Exception) {
logger.error("$LOGGER_PREFIX Failure in cleanup", exception)
} finally {
if (wasLockingAcquired) {
try {
cleanupLocksProvider.release()
} catch (exception: Exception) {
logger.error("$LOGGER_PREFIX Failed to release cleanup lock ($cleanupLocksProvider)", exception)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ class TransactionalOutboxImplSpec extends Specification {
OutboxType type = handler.getSupportedType()
Map<OutboxType, OutboxHandler> handlers = Map.of(type, handler)

OutboxLocksProvider locksProvider = Mock()
OutboxLocksProvider monitorLocksProvider = Mock()
OutboxLocksProvider cleanupLocksProvider = Mock()
OutboxStore store = Mock()
InstantOutboxPublisher instantOutboxPublisher = Mock()
OutboxItemFactory outboxItemFactory = Mock()
Expand All @@ -39,7 +40,8 @@ class TransactionalOutboxImplSpec extends Specification {
transactionalOutbox = new TransactionalOutboxImpl(
clock,
handlers,
locksProvider,
monitorLocksProvider,
cleanupLocksProvider,
store,
instantOutboxPublisher,
outboxItemFactory,
Expand Down Expand Up @@ -70,7 +72,7 @@ class TransactionalOutboxImplSpec extends Specification {
transactionalOutbox.monitor()

then:
1 * locksProvider.acquire()
1 * monitorLocksProvider.acquire()
1 * store.fetch(_) >> { OutboxFilter filter ->
with(filter) {
outboxPendingFilter.nextRunLessThan == now
Expand All @@ -86,7 +88,7 @@ class TransactionalOutboxImplSpec extends Specification {
}
return item
}
1 * locksProvider.release()
1 * monitorLocksProvider.release()
0 * _

when:
Expand All @@ -106,7 +108,7 @@ class TransactionalOutboxImplSpec extends Specification {
transactionalOutbox.monitor()

then:
1 * locksProvider.acquire()
1 * monitorLocksProvider.acquire()
1 * store.fetch(_) >> { OutboxFilter filter ->
with(filter) {
outboxPendingFilter.nextRunLessThan == now
Expand All @@ -121,7 +123,7 @@ class TransactionalOutboxImplSpec extends Specification {
}
return item
}
1 * locksProvider.release()
1 * monitorLocksProvider.release()
0 * _

when:
Expand All @@ -147,7 +149,58 @@ class TransactionalOutboxImplSpec extends Specification {
transactionalOutbox.cleanup()

then:
1 * cleanupLocksProvider.acquire()
1 * store.deleteCompletedItems(now)
1 * cleanupLocksProvider.release()
0 * _
}

def "Should early return from cleanup if in shutdown mode"() {
when:
transactionalOutbox.shutdown()
transactionalOutbox.cleanup()

then:
0 * _
}

def "Should handle an exception thrown from the cleanup store method"() {
when:
transactionalOutbox.cleanup()

then:
1 * cleanupLocksProvider.acquire()
1 * store.deleteCompletedItems(_) >> {
throw new InterruptedException()
}
1 * cleanupLocksProvider.release()
0 * _
noExceptionThrown()
}

def "Should handle an exception thrown during the cleanup release locks"() {
when:
transactionalOutbox.cleanup()

then:
1 * cleanupLocksProvider.acquire()
1 * store.deleteCompletedItems(_)
1 * cleanupLocksProvider.release() >> {
throw new InterruptedException()
}
0 * _
noExceptionThrown()
}

def "Should not release the lock in cleanup, after a failure in acquire"() {
when:
transactionalOutbox.cleanup()

then:
1 * cleanupLocksProvider.acquire() >> {
throw new InterruptedException()
}
0 * _
noExceptionThrown()
}
}
Loading

0 comments on commit cb983f7

Please sign in to comment.