From 4b202ca6a9b7cb75592e9c6faa7e3ba90b9c76f9 Mon Sep 17 00:00:00 2001 From: Josep Boix Requesens Date: Mon, 6 Jan 2025 14:46:49 +0100 Subject: [PATCH] feat: consolidate event processing and optimize application performance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The event processing system has been updated to improve readability and efficiency: - Switched from Reactor to Kotlin Flow for event handling to improve readability. - Events are now saved in batch, reducing concurrency and improving performance. - Removed reconciliation logic with a more aggressive in memory cache approach. - Replaced Caffeine with a simple LinkedHashMap cache to support LRU eviction policy. - Added configuration parameters to adjust the size of the cache, buffer and batch chunk size. These properties can also be modified with environment variables. - Removed actuator: benchmarks are now log, reducing memory consumption and eliminating the need for starting a server. - Updated dependencies to the latest compatible versions. - Replaced dynamic mapping in OpenSearch with a handmade mapping for document fields. This change reduces storage requirements, and eliminates unnecessary fields, making queries more logical and efficient. Co-authored-by: Gaëtan Muller --- Dockerfile | 2 +- README.md | 26 +- build.gradle.kts | 23 +- .../DataTransferApplicationRunner.kt | 51 ++- .../PillarboxDataTransferApplication.kt | 2 + .../benchmark/BenchmarkScheduledLogger.kt | 30 ++ .../benchmark/MovingAverageCalculator.kt | 41 +++ .../monitoring/benchmark/StatsTracker.kt | 56 +++ .../monitoring/benchmark/TimeTracker.kt | 61 ++++ .../pillarbox/monitoring/cache/LRUCache.kt | 56 +++ .../monitoring/common/RetryProperties.kt | 30 -- .../monitoring/concurrent/LockManager.kt | 36 -- .../concurrent/LockManagerConfiguration.kt | 19 - .../monitoring/event/EventDispatcherClient.kt | 107 +++--- .../EventDispatcherClientConfiguration.kt | 26 ++ .../monitoring/event/EventService.kt | 70 ---- .../event/config/RetryProperties.kt | 60 ++++ .../SseClientConfigurationProperties.kt | 20 -- .../monitoring/event/model/EventRequest.kt | 8 +- .../event/repository/ActionRepository.kt | 33 -- .../event/repository/EventRepository.kt | 11 + .../repository/OpenSearchConfiguration.kt | 2 + .../OpenSearchConfigurationProperties.kt | 2 +- .../monitoring/event/setup/AliasSetupTask.kt | 7 +- .../event/setup/OpenSearchSetupService.kt | 6 +- .../pillarbox/monitoring/flow/FlowUtils.kt | 41 +++ .../health/BenchmarkHealthIndicator.kt | 29 -- .../monitoring/health/Benchmarked.kt | 20 -- .../monitoring/health/BenchmarkingAspect.kt | 60 ---- .../health/MovingAverageCalculator.kt | 39 --- .../{LoggingUtils.kt => LoggingExtensions.kt} | 9 +- .../ElasticsearchRepositoryExtensions.kt | 26 ++ ...itional-spring-configuration-metadata.json | 10 - src/main/resources/application-local.yml | 4 + src/main/resources/application-prod.yml | 4 + src/main/resources/application.yml | 17 +- src/main/resources/opensearch/index.json | 37 ++ .../resources/opensearch/index_template.json | 330 +++++++++++++++++- src/main/resources/opensearch/ism_policy.json | 4 +- .../monitoring/cache/LRUCacheTest.kt | 42 +++ .../event/model/EventRequestTest.kt | 2 + 41 files changed, 961 insertions(+), 498 deletions(-) create mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/benchmark/BenchmarkScheduledLogger.kt create mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/benchmark/MovingAverageCalculator.kt create mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/benchmark/StatsTracker.kt create mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/benchmark/TimeTracker.kt create mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/cache/LRUCache.kt delete mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/common/RetryProperties.kt delete mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/concurrent/LockManager.kt delete mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/concurrent/LockManagerConfiguration.kt create mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClientConfiguration.kt delete mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventService.kt create mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/config/RetryProperties.kt delete mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/config/SseClientConfigurationProperties.kt delete mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/ActionRepository.kt create mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/EventRepository.kt create mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/flow/FlowUtils.kt delete mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/health/BenchmarkHealthIndicator.kt delete mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/health/Benchmarked.kt delete mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/health/BenchmarkingAspect.kt delete mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/health/MovingAverageCalculator.kt rename src/main/kotlin/ch/srgssr/pillarbox/monitoring/log/{LoggingUtils.kt => LoggingExtensions.kt} (79%) create mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/opensearch/ElasticsearchRepositoryExtensions.kt delete mode 100644 src/main/resources/META-INF/additional-spring-configuration-metadata.json create mode 100644 src/main/resources/application-local.yml create mode 100644 src/main/resources/application-prod.yml create mode 100644 src/test/kotlin/ch/srgssr/pillarbox/monitoring/cache/LRUCacheTest.kt diff --git a/Dockerfile b/Dockerfile index 264c316..410080d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,4 +18,4 @@ COPY --from=build /app/build/libs/app.jar app.jar RUN java -Djarmode=layertools -jar app.jar extract # JVM memory and garbage collection optimizations for containers -ENTRYPOINT ["java", "-Dsun.net.inetaddr.ttl=60", "-Dsun.net.inetaddr.negative.ttl=10", "-Xms512m", "-Xmx1024m", "-XX:+UseG1GC", "-XX:MaxGCPauseMillis=200", "-XX:+UseContainerSupport", "-jar", "/app.jar"] +ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -Dsun.net.inetaddr.ttl=60 -Dsun.net.inetaddr.negative.ttl=10 -XX:+UseContainerSupport --add-opens java.base/java.math=ALL-UNNAMED -jar /app.jar"] diff --git a/README.md b/README.md index e9637de..79b8fce 100644 --- a/README.md +++ b/README.md @@ -61,9 +61,6 @@ Alternatively, you can build and run the application using Docker: This project is a Kotlin-based Spring Boot application designed to connect to a Server-Sent Events ( SSE) endpoint, process incoming events, and store data in an OpenSearch index. -It leverages Spring Boot's reactive WebFlux framework and integrates custom health indicators using -Spring Boot Actuator to provide insights into application performance. - ### System Flow Overview The main loop of this service is illustrated in the following diagram: @@ -72,7 +69,6 @@ The main loop of this service is illustrated in the following diagram: sequenceDiagram participant DataTransfer participant SSEEndpoint - participant LockManager participant OpenSearch DataTransfer ->> OpenSearch: Initialize index DataTransfer ->> SSEEndpoint: Connects and listens to events @@ -80,7 +76,6 @@ sequenceDiagram SSEEndpoint --) DataTransfer: Send Event DataTransfer ->> LockManager: Acquire session lock DataTransfer ->> OpenSearch: Store event - DataTransfer -->> LockManager: Release session lock end ``` @@ -91,14 +86,10 @@ system: - [PillarboxDataTransferApplication.kt][main-entry-point]: The main entry point of the application that bootstraps and configures the service. -- [BenchmarkHealthIndicator.kt][health-indicator]: Monitors the performance of key operations, - providing real-time health metrics for the application. -- [LockManager.kt][lock-manager]: Ensures concurrency control by managing locks for different - sessions, enabling thread-safe operations. -- [SetupService.kt][setup-service]: Manages the initial setup of the OpenSearch index and the - application’s configuration for SSE processing. -- [SseClient.kt][sse-client]: Listens to the SSE endpoint, handling incoming events and managing - retries in case of connection failures. +- [OpenSearchSetupService.kt][setup-service]: Manages the initial setup of the OpenSearch index and + the application’s configuration for SSE processing. +- [EventDispatcherClient.kt][sse-client]: Listens to the SSE endpoint, handling incoming events and + managing retries in case of connection failures. Here’s a more concise description of the GitHub Actions setup without listing the steps: @@ -162,11 +153,6 @@ Refer to our [Contribution Guide](docs/CONTRIBUTING.md) for more detailed inform This project is licensed under the [MIT License](LICENSE). [main-entry-point]: src/main/kotlin/ch/srgssr/pillarbox/monitoring/PillarboxDataTransferApplication.kt +[setup-service]: src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupService.kt +[sse-client]: src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClient.kt -[health-indicator]: src/main/kotlin/ch/srgssr/pillarbox/monitoring/health/BenchmarkHealthIndicator.kt - -[lock-manager]: src/main/kotlin/ch/srgssr/pillarbox/monitoring/concurrent/LockManager.kt - -[setup-service]: src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/SetupService.kt - -[sse-client]: src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/SseClient.kt diff --git a/build.gradle.kts b/build.gradle.kts index c3fdfcf..f5206cb 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -4,11 +4,12 @@ import java.util.Properties plugins { kotlin("jvm") version "2.0.10" kotlin("plugin.spring") version "2.0.10" - id("org.springframework.boot") version "3.3.4" - id("io.spring.dependency-management") version "1.1.6" + id("org.springframework.boot") version "3.4.1" + id("io.spring.dependency-management") version "1.1.7" id("io.gitlab.arturbosch.detekt") version "1.23.7" - id("org.jlleitschuh.gradle.ktlint") version "12.1.1" - id("org.jetbrains.kotlinx.kover") version "0.8.3" + id("org.jlleitschuh.gradle.ktlint") version "12.1.2" + id("org.jetbrains.kotlinx.kover") version "0.9.0" + id("com.github.ben-manes.versions") version "0.51.0" } group = "ch.srgssr.pillarbox" @@ -24,23 +25,23 @@ repositories { } dependencies { - implementation("org.springframework.boot:spring-boot-starter-aop") - implementation("org.springframework.boot:spring-boot-starter-actuator") - implementation("org.opensearch.client:spring-data-opensearch-starter:1.5.3") + // Dependencies + implementation("org.opensearch.client:spring-data-opensearch-starter:1.6.0") implementation("org.springframework.boot:spring-boot-starter-webflux") implementation("com.fasterxml.jackson.module:jackson-module-kotlin") implementation("org.jetbrains.kotlin:kotlin-reflect") + implementation("nl.basjes.parse.useragent:yauaa:7.29.0") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor") - implementation("com.github.ben-manes.caffeine:caffeine") - implementation("nl.basjes.parse.useragent:yauaa:7.28.1") + + // Test Dependencies testImplementation("io.kotest:kotest-runner-junit5:5.9.1") testImplementation("org.springframework.boot:spring-boot-starter-test") testImplementation("io.kotest.extensions:kotest-extensions-spring:1.3.0") testImplementation("org.jetbrains.kotlin:kotlin-test-junit5") testImplementation("io.mockk:mockk:1.13.13") - testImplementation("com.squareup.okhttp3:mockwebserver") - testImplementation("com.squareup.okhttp3:okhttp") + testImplementation("com.squareup.okhttp3:mockwebserver:4.12.0") + testImplementation("com.squareup.okhttp3:okhttp:4.12.0") testRuntimeOnly("org.junit.platform:junit-platform-launcher") } diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/DataTransferApplicationRunner.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/DataTransferApplicationRunner.kt index f3e0a0d..4803f04 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/DataTransferApplicationRunner.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/DataTransferApplicationRunner.kt @@ -2,11 +2,13 @@ package ch.srgssr.pillarbox.monitoring import ch.srgssr.pillarbox.monitoring.event.EventDispatcherClient import ch.srgssr.pillarbox.monitoring.event.setup.OpenSearchSetupService -import ch.srgssr.pillarbox.monitoring.exception.RetryExhaustedException import ch.srgssr.pillarbox.monitoring.log.logger import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.launch +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.reactive.asFlow +import kotlinx.coroutines.runBlocking import org.springframework.boot.ApplicationArguments import org.springframework.boot.ApplicationRunner import org.springframework.context.annotation.Profile @@ -43,33 +45,22 @@ class DataTransferApplicationRunner( * * @param args Application arguments. */ - override fun run(args: ApplicationArguments?) { - openSearchSetupService.start().subscribe( - // Success - { this.startSseClient() }, - // Error - { - logger.error("Failed to connect to OpenSearch:", it) - CoroutineScope(Dispatchers.IO).launch { - terminationService.terminateApplication() - } - }, - ) - } + override fun run(args: ApplicationArguments?) = + runBlocking { + try { + openSearchSetupService + .start() + .asFlow() + .catch { + logger.error("Failed to connect to OpenSearch:", it) + throw it + }.launchIn(CoroutineScope(Dispatchers.Default)) + .join() - private fun startSseClient() { - eventDispatcherClient.start().subscribe( - // Success - { }, - // Error - { - if (it is RetryExhaustedException) { - logger.error("Failed to connect to SSE after retries, terminating application.", it) - CoroutineScope(Dispatchers.IO).launch { - terminationService.terminateApplication() - } - } - }, - ) - } + logger.info("All setup tasks are completed, starting SSE client...") + eventDispatcherClient.start().join() + } finally { + terminationService.terminateApplication() + } + } } diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/PillarboxDataTransferApplication.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/PillarboxDataTransferApplication.kt index dc690d4..6e9cbf5 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/PillarboxDataTransferApplication.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/PillarboxDataTransferApplication.kt @@ -4,12 +4,14 @@ import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchDataAutoConfiguration import org.springframework.boot.context.properties.ConfigurationPropertiesScan import org.springframework.boot.runApplication +import org.springframework.scheduling.annotation.EnableScheduling /** * Main entry point for the Pillarbox QoS Data Transfer application. */ @SpringBootApplication(exclude = [ElasticsearchDataAutoConfiguration::class]) @ConfigurationPropertiesScan +@EnableScheduling class PillarboxDataTransferApplication /** diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/benchmark/BenchmarkScheduledLogger.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/benchmark/BenchmarkScheduledLogger.kt new file mode 100644 index 0000000..b604eb2 --- /dev/null +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/benchmark/BenchmarkScheduledLogger.kt @@ -0,0 +1,30 @@ +package ch.srgssr.pillarbox.monitoring.benchmark + +import ch.srgssr.pillarbox.monitoring.log.info +import ch.srgssr.pillarbox.monitoring.log.logger +import org.springframework.scheduling.annotation.Scheduled +import org.springframework.stereotype.Component +import java.util.concurrent.TimeUnit + +/** + * A scheduled logger that periodically logs the average execution times + * and stats regarding the number of processed events. + */ +@Component +class BenchmarkScheduledLogger { + private companion object { + /** + * Logger instance for logging within this component. + */ + private val logger = logger() + } + + /** + * The scheduled logging function, executes every minute. + */ + @Scheduled(fixedRate = 1, timeUnit = TimeUnit.MINUTES) + fun logBenchmarkAverages() { + logger.info { "Benchmark averages: ${TimeTracker.averages}" } + logger.info { "Latest stats per minute: ${StatsTracker.getAndResetAll()}" } + } +} diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/benchmark/MovingAverageCalculator.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/benchmark/MovingAverageCalculator.kt new file mode 100644 index 0000000..2e768d1 --- /dev/null +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/benchmark/MovingAverageCalculator.kt @@ -0,0 +1,41 @@ +package ch.srgssr.pillarbox.monitoring.benchmark + +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.LongAdder + +/** + * A thread-safe class for calculating the moving average of a stream of long values. + * + * This implementation accumulates values until the average is calculated, at which point + * the internal state (sum and count) is reset. + */ +internal class MovingAverageCalculator { + private val sum = LongAdder() + private val count = AtomicLong() + + /** + * Adds a new value to the moving average calculation. + * + * @param n The value to be added to the moving average calculation. + */ + fun add(n: Long) { + sum.add(n) + count.incrementAndGet() + } + + /** + * Calculates the average of the accumulated values and resets the internal state. + * + * This method atomically retrieves and resets both the accumulated sum and count. + * If no values have been added, the method returns `Double.NaN`. + * + * @return The average of the accumulated values, or `NaN` if no values were added. + */ + val average: Double + get() = + synchronized(this) { + val totalCount = count.getAndSet(0) + val total = sum.sumThenReset().toDouble() + if (totalCount == 0L) Double.NaN else total / totalCount + } +} diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/benchmark/StatsTracker.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/benchmark/StatsTracker.kt new file mode 100644 index 0000000..4989810 --- /dev/null +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/benchmark/StatsTracker.kt @@ -0,0 +1,56 @@ +package ch.srgssr.pillarbox.monitoring.benchmark + +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong + +/** + * Utility object for tracking and aggregating statistics. + * + * This class uses a [ConcurrentHashMap] to store statistics counters identified by unique keys. + * It provides methods to increment, retrieve, and reset these counters. + */ +object StatsTracker { + private val stats = ConcurrentHashMap() + + /** + * Increments the count for a specific key by the specified delta. + * If the key does not exist, it is initialized to 0 before incrementing. + * + * @param key The unique identifier for the statistic. + * @param delta The amount to increment the counter by. Defaults to 1. + */ + fun increment( + key: String, + delta: Long = 1, + ) { + stats.computeIfAbsent(key) { AtomicLong(0) }.addAndGet(delta) + } + + /** + * Increments the count for a specific key by the specified delta. + * + * @param key The unique identifier for the statistic. + * @param delta The amount to increment the counter by, as an [Int]. + */ + fun increment( + key: String, + delta: Int, + ) { + increment(key, delta.toLong()) + } + + /** + * Retrieves the current count for a specific key. + * + * @param key The unique identifier for the statistic. + * @return The current count for the key, or 0 if the key does not exist. + */ + operator fun get(key: String): Long = stats[key]?.get() ?: 0L + + /** + * Retrieves all current statistics as a map and resets their counters to 0. + * + * @return A map containing the keys and their corresponding counts before reset. + */ + fun getAndResetAll(): Map = stats.mapValues { it.value.getAndSet(0) } +} diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/benchmark/TimeTracker.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/benchmark/TimeTracker.kt new file mode 100644 index 0000000..1b837f3 --- /dev/null +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/benchmark/TimeTracker.kt @@ -0,0 +1,61 @@ +package ch.srgssr.pillarbox.monitoring.benchmark + +import ch.srgssr.pillarbox.monitoring.log.logger +import ch.srgssr.pillarbox.monitoring.log.trace +import java.util.concurrent.ConcurrentHashMap +import kotlin.time.measureTimedValue + +/** + * Utility object for tracking and logging execution times of code blocks with support for moving averages. + */ +object TimeTracker { + private val logger = logger() + + private val movingAverages = ConcurrentHashMap() + + /** + * Tracks the execution time of a code block. + * + * @param T The return type of the code block. + * @param signature A unique identifier (e.g., method name) for tracking and logging purposes. + * @param block The suspending code block to measure. + * + * @return The result of the code block. + */ + suspend fun track( + signature: String, + block: suspend () -> T, + ): T { + val (result, executionTime) = measureTimedValue { block() } + val calculator = + movingAverages.computeIfAbsent(signature) { + MovingAverageCalculator() + } + + calculator.add(executionTime.inWholeMilliseconds) + logger.trace { "$signature took $executionTime" } + + return result + } + + /** + * Provides the average execution times for all monitored methods. + * + * @return A map where the keys are method signatures and the values are the average execution times. + */ + val averages get() = movingAverages.mapValues { it.value.average } +} + +/** + * Convenience function to track execution time of a code block using [TimeTracker]. + * + * @param T The return type of the code block. + * @param signature A unique identifier (e.g., method name) for tracking and logging purposes. + * @param block The suspending code block to measure. + * + * @return The result of the code block. + */ +suspend fun timed( + signature: String, + block: suspend () -> T, +): T = TimeTracker.track(signature, block) diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/cache/LRUCache.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/cache/LRUCache.kt new file mode 100644 index 0000000..86ef088 --- /dev/null +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/cache/LRUCache.kt @@ -0,0 +1,56 @@ +package ch.srgssr.pillarbox.monitoring.cache + +import java.util.Collections + +/** + * A simple implementation of an LRU (Least Recently Used) cache that uses a [LinkedHashMap] + * with `accessOrder` set to `true`. + * + * This implementation is thread-safe through synchronization, it may not perform well + * under high concurrency. + * + * @param K The type of the keys maintained by this cache. + * @param V The type of the values stored in this cache. + * + * @property capacity The maximum number of entries this cache can hold. Once the capacity is exceeded, + * the least recently used entry is evicted. + */ +class LRUCache( + private val capacity: Int, +) { + private val cache = + Collections.synchronizedMap( + object : LinkedHashMap(capacity, 0.75f, true) { + /** + * Removes the eldest entry when the size of the cache exceeds the specified [capacity]. + * + * @param eldest The eldest entry in the map. + * @return `true` if the eldest entry should be removed, `false` otherwise. + */ + override fun removeEldestEntry(eldest: MutableMap.MutableEntry?): Boolean = size > this@LRUCache.capacity + }, + ) + + /** + * Retrieves the value associated with the given [key], or returns `null` if the key + * does not exist in the cache. + * + * @param key The key whose associated value is to be returned. + * + * @return The value associated with the specified [key], or `null` if the key is not found. + */ + fun get(key: K): V? = cache[key] + + /** + * Adds a key-value pair to the cache. + * + * @param key The key to be added or updated in the cache. + * @param value The value to be associated with the [key]. + */ + fun put( + key: K, + value: V, + ) { + cache[key] = value + } +} diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/common/RetryProperties.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/common/RetryProperties.kt deleted file mode 100644 index 1ef9314..0000000 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/common/RetryProperties.kt +++ /dev/null @@ -1,30 +0,0 @@ -package ch.srgssr.pillarbox.monitoring.common - -import reactor.util.retry.Retry -import reactor.util.retry.RetryBackoffSpec -import java.time.Duration - -/** - * Configuration class for retry behavior. - * - * @property maxAttempts The maximum number of retry attempts. Defaults to 5. - * @property initialInterval The initial interval between retry attempts. Defaults to 5 seconds. - * @property maxInterval The maximum interval between retry attempts. Defaults to 1 minute. - */ -data class RetryProperties( - val maxAttempts: Long = 5, - val initialInterval: Duration = Duration.ofSeconds(5), - val maxInterval: Duration = Duration.ofMinutes(1), -) { - /** - * Creates a [RetryBackoffSpec] based on the retry properties. - * This specification defines the backoff strategy for retries, - * including the number of attempts and interval timings. - * - * @return A configured [RetryBackoffSpec] for use with Reactor's retry mechanism. - */ - fun create(): RetryBackoffSpec = - Retry - .backoff(maxAttempts, initialInterval) - .maxBackoff(maxInterval) -} diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/concurrent/LockManager.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/concurrent/LockManager.kt deleted file mode 100644 index 571111a..0000000 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/concurrent/LockManager.kt +++ /dev/null @@ -1,36 +0,0 @@ -package ch.srgssr.pillarbox.monitoring.concurrent - -import com.github.benmanes.caffeine.cache.Caffeine -import kotlinx.coroutines.sync.Mutex -import org.springframework.stereotype.Component -import java.time.Duration - -/** - * The LockManager class is responsible for managing session-based locks - * using Kotlin coroutines' Mutex to ensure synchronized access to shared resources. - * - * The locks are stored in a cache that automatically expires entries after - * a configurable time of inactivity, preventing memory leaks by removing stale locks. - * - * @param configuration The configuration object used to set the time-to-live (TTL) for each lock. - */ -@Component -class LockManager( - configuration: LockManagerConfiguration, -) { - private val sessionLocks = - Caffeine - .newBuilder() - .expireAfterAccess(Duration.ofSeconds(configuration.ttl)) - .build() - - /** - * Retrieves the Mutex associated with the given session ID. If no Mutex exists for the session, - * a new one is created and stored in the cache. - * - * @param sessionId The unique identifier for the session that requires a lock. - * - * @return The Mutex associated with the given session ID. - */ - operator fun get(sessionId: String): Mutex = sessionLocks.get(sessionId) { Mutex() } -} diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/concurrent/LockManagerConfiguration.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/concurrent/LockManagerConfiguration.kt deleted file mode 100644 index e06e594..0000000 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/concurrent/LockManagerConfiguration.kt +++ /dev/null @@ -1,19 +0,0 @@ -package ch.srgssr.pillarbox.monitoring.concurrent - -import org.springframework.boot.context.properties.ConfigurationProperties -import org.springframework.context.annotation.Configuration - -/** - * Configuration class for the LockManager, which holds settings related to - * session locks such as time-to-live (TTL) for the locks. - * - * This class is bound to properties with the prefix "pillarbox.monitoring.lock" - * in the application's configuration files. - * - * @param ttl The time-to-live in seconds for each lock. Defaults to 30 seconds. - */ -@Configuration -@ConfigurationProperties(prefix = "pillarbox.monitoring.lock") -data class LockManagerConfiguration( - var ttl: Long = 30, -) diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClient.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClient.kt index b9bad95..ac5f5f8 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClient.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClient.kt @@ -1,31 +1,39 @@ package ch.srgssr.pillarbox.monitoring.event -import ch.srgssr.pillarbox.monitoring.concurrent.LockManager -import ch.srgssr.pillarbox.monitoring.event.config.SseClientConfigurationProperties +import ch.srgssr.pillarbox.monitoring.benchmark.StatsTracker +import ch.srgssr.pillarbox.monitoring.benchmark.timed +import ch.srgssr.pillarbox.monitoring.cache.LRUCache import ch.srgssr.pillarbox.monitoring.event.model.EventRequest -import ch.srgssr.pillarbox.monitoring.exception.RetryExhaustedException +import ch.srgssr.pillarbox.monitoring.event.repository.EventRepository +import ch.srgssr.pillarbox.monitoring.flow.chunked +import ch.srgssr.pillarbox.monitoring.log.info import ch.srgssr.pillarbox.monitoring.log.logger +import ch.srgssr.pillarbox.monitoring.log.trace +import ch.srgssr.pillarbox.monitoring.opensearch.saveAllSuspend import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.retryWhen import org.springframework.stereotype.Service import org.springframework.web.reactive.function.client.WebClient -import org.springframework.web.reactive.function.client.bodyToFlux +import org.springframework.web.reactive.function.client.bodyToFlow /** * Service responsible for managing a Server-Sent Events (SSE) connection to the event dispatcher service. * It handles incoming events, and manages retry behavior in case of connection failures. * - * @property eventService The service used to handle incoming events. + * @property eventRepository The repository where the events are stored. * @property properties The SSE client configuration containing the URI and retry settings. - * @property lockManager The session based lock manager. */ @Service class EventDispatcherClient( - private val eventService: EventService, - private val properties: SseClientConfigurationProperties, - private val lockManager: LockManager, + private val eventRepository: EventRepository, + private val properties: EventDispatcherClientConfiguration, ) { private companion object { /** @@ -34,50 +42,63 @@ class EventDispatcherClient( private val logger = logger() } - /** + private val sessionCache: LRUCache = LRUCache(properties.cacheSize) + + /**a * Starts the SSE client, connecting to the configured SSE endpoint. It handles incoming events by * delegating to the appropriate event handling methods and manages retries in case of connection failures. */ - fun start() = + fun start(): Job = WebClient .create(properties.uri) .get() .retrieve() - .bodyToFlux() + .bodyToFlow() .retryWhen( - properties.retry - .create() - .doBeforeRetry { - logger.warn("Retrying SSE connection...") - }.onRetryExhaustedThrow { _, retrySignal -> - RetryExhaustedException( - "Retries exhausted after ${retrySignal.totalRetries()} attempts", - retrySignal.failure(), + properties.sseRetry.toRetryWhen( + onRetry = { cause, attempt, delayMillis -> + logger.warn( + "Retrying after failure: ${cause.message}. " + + "Attempt ${attempt + 1}. Waiting for ${delayMillis}ms", ) }, - ).doOnNext { CoroutineScope(Dispatchers.IO).launch { handleEvent(it) } } - .doOnError { error -> - if (error !is RetryExhaustedException) { - logger.error("An error occurred while processing the event.", error) - } - } + ), + ).onEach { StatsTracker.increment("incomingEvents") } + .buffer( + capacity = properties.bufferCapacity, + onBufferOverflow = BufferOverflow.DROP_OLDEST, + ).chunked(properties.saveChunkSize) + .onEach { logger.info { "Start processing next ${it.size} events" } } + .map { events -> + StatsTracker.increment("nonDroppedEvents", events.size) - private suspend fun handleEvent(eventRequest: EventRequest) { - lockManager[eventRequest.sessionId].withLock { - when (eventRequest.eventName) { - "START" -> handleStartEvent(eventRequest) - else -> handleNonStartEvent(eventRequest) - } - } - } + val startEvents = + events + .filter { it.eventName == "START" } + .onEach { sessionCache.put(it.sessionId, it.data) } - private suspend fun handleStartEvent(eventRequest: EventRequest) { - eventService.updateSessionData(eventRequest) - eventService.saveEvent(eventRequest) - } + val nonStartEvents = + events + .filter { it.eventName != "START" } + .onEach { it.session = sessionCache.get(it.sessionId) } + .filter { it.session != null } + .also { StatsTracker.increment("cacheHits", it.size) } + + startEvents + nonStartEvents + }.onEach { logger.info { "Adding ${it.size} events to next save batch" } } + .onEach { this.saveEvents(it) } + .launchIn(CoroutineScope(Dispatchers.Default)) + + @Suppress("TooGenericExceptionCaught") + private suspend fun saveEvents(events: List) { + try { + logger.trace { "Saving events $events" } - private suspend fun handleNonStartEvent(eventRequest: EventRequest) { - eventRequest.session = eventService.findSession(eventRequest.sessionId)?.data - eventService.saveEvent(eventRequest) + timed("EventRepository.saveEvents") { + eventRepository.saveAllSuspend(events) + } + } catch (e: Exception) { + logger.error("An error occurred while saving the current batch", e) + } } } diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClientConfiguration.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClientConfiguration.kt new file mode 100644 index 0000000..572a3b3 --- /dev/null +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClientConfiguration.kt @@ -0,0 +1,26 @@ +package ch.srgssr.pillarbox.monitoring.event + +import ch.srgssr.pillarbox.monitoring.event.config.RetryProperties +import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.boot.context.properties.NestedConfigurationProperty + +/** + * Configuration class for Server-Sent Events (SSE) client settings in the application. + * This class is mapped to properties prefixed with `pillarbox.monitoring.dispatch` in + * the application's configuration files. + * + * @property uri The URI for the SSE endpoint. Defaults to "http://localhost:8080". + * @property cacheSize The maximum number of events to cache in memory. + * @property bufferCapacity The size of the buffer used for handling backpressure. + * @property saveChunkSize The size of event chunks to save at a time. + * @property sseRetry Configuration for retry behavior in case of connection failures. + */ +@ConfigurationProperties(prefix = "pillarbox.monitoring.dispatch") +data class EventDispatcherClientConfiguration( + val uri: String = "http://localhost:8080", + val cacheSize: Int = 200_000, + val bufferCapacity: Int = 30_000, + val saveChunkSize: Int = 6_000, + @NestedConfigurationProperty + val sseRetry: RetryProperties = RetryProperties(), +) diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventService.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventService.kt deleted file mode 100644 index 039178d..0000000 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventService.kt +++ /dev/null @@ -1,70 +0,0 @@ -package ch.srgssr.pillarbox.monitoring.event - -import ch.srgssr.pillarbox.monitoring.event.model.EventRequest -import ch.srgssr.pillarbox.monitoring.event.repository.ActionRepository -import ch.srgssr.pillarbox.monitoring.health.Benchmarked -import ch.srgssr.pillarbox.monitoring.log.debug -import ch.srgssr.pillarbox.monitoring.log.logger -import org.springframework.stereotype.Service - -/** - * Service responsible for handling operations related to events, such as finding sessions, saving events, - * and updating session data. - * - * @property actionRepository The repository used to access and manipulate event-related data. - */ -@Service -class EventService( - private val actionRepository: ActionRepository, -) { - private companion object { - /** - * Logger instance for logging within this service. - */ - private val logger = logger() - } - - /** - * Finds the "START" event associated with the given session ID. - * The method is benchmarked to monitor its execution time. - * - * @param sessionId The ID of the session to be retrieved. - * @return The event representing the "START" event for the given session ID, or `null` if not found. - */ - @Benchmarked - fun findSession(sessionId: String): EventRequest? { - logger.debug { "Fetching start event for $sessionId" } - return actionRepository.findBySessionIdAndEventName( - sessionId, - "START", - ) - } - - /** - * Saves the provided event request to the database. - * The method is benchmarked to monitor its execution time. - * - * @param eventRequest The event request to be saved. - */ - @Benchmarked - fun saveEvent(eventRequest: EventRequest) { - logger.debug { "Saving event ${eventRequest.sessionId} and type ${eventRequest.eventName}" } - actionRepository.save(eventRequest) - } - - /** - * Updates session data for all events associated with the given session ID that do not yet have session data. - * The session data is updated with the data from the provided start event. - * The method is benchmarked to monitor its execution time. - * - * @param startEvent The "START" event containing the session data to be used for the update. - */ - @Benchmarked - fun updateSessionData(startEvent: EventRequest) { - logger.debug { "Updating session: ${startEvent.sessionId}" } - actionRepository - .findAllBySessionIdAndSessionIsNull(startEvent.sessionId) - .onEach { e -> e.session = startEvent.data } - .forEach(actionRepository::save) - } -} diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/config/RetryProperties.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/config/RetryProperties.kt new file mode 100644 index 0000000..95a03bd --- /dev/null +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/config/RetryProperties.kt @@ -0,0 +1,60 @@ +package ch.srgssr.pillarbox.monitoring.event.config + +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.FlowCollector +import reactor.util.retry.Retry +import reactor.util.retry.RetryBackoffSpec +import java.time.Duration + +/** + * Configuration class for retry behavior. + * + * @property maxAttempts The maximum number of retry attempts. Defaults to 5. + * @property initialInterval The initial interval between retry attempts. Defaults to 5 seconds. + * @property maxInterval The maximum interval between retry attempts. Defaults to 1 minute. + */ +data class RetryProperties( + val maxAttempts: Long = 5, + val initialInterval: Duration = Duration.ofSeconds(5), + val maxInterval: Duration = Duration.ofMinutes(1), +) { + /** + * Creates a [RetryBackoffSpec] based on the retry properties. + * This specification defines the backoff strategy for retries, + * including the number of attempts and interval timings. + * + * @return A configured [RetryBackoffSpec] for use with Reactor's retry mechanism. + */ + fun create(): RetryBackoffSpec = + Retry + .backoff(maxAttempts, initialInterval) + .maxBackoff(maxInterval) + + /** + * Creates a `retryWhen` callback function for Kotlin Flow. + * + * @param onRetry Callback for logging or handling retry events. It receives the throwable, + * current attempt number, and calculated delay. + * @return A suspendable lambda for `retryWhen` in Kotlin Flow. + */ + fun toRetryWhen( + predicate: (Throwable) -> Boolean = { true }, + onRetry: (Throwable, Long, Long) -> Unit = { _, _, _ -> }, + ): suspend FlowCollector.(cause: Throwable, attempt: Long) -> Boolean = + { cause, attempt -> + run { + if (!predicate(cause) || attempt >= maxAttempts) return@run false + + val delayMillis = calculateBackoff(attempt) + onRetry(cause, attempt, delayMillis) + delay(delayMillis) + + true + } + } + + private fun calculateBackoff(attempt: Long): Long { + val exponentialBackoff = initialInterval.toMillis() * (1L shl attempt.toInt()) + return exponentialBackoff.coerceAtMost(maxInterval.toMillis()) + } +} diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/config/SseClientConfigurationProperties.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/config/SseClientConfigurationProperties.kt deleted file mode 100644 index 5c147fb..0000000 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/config/SseClientConfigurationProperties.kt +++ /dev/null @@ -1,20 +0,0 @@ -package ch.srgssr.pillarbox.monitoring.event.config - -import ch.srgssr.pillarbox.monitoring.common.RetryProperties -import org.springframework.boot.context.properties.ConfigurationProperties -import org.springframework.boot.context.properties.NestedConfigurationProperty - -/** - * Configuration class for Server-Sent Events (SSE) client settings in the application. - * This class is mapped to properties prefixed with `pillarbox.monitoring.dispatch` in - * the application's configuration files. - * - * @property uri The URI for the SSE endpoint. Defaults to "http://localhost:8080". - * @property retry Configuration for retry behavior in case of connection failures. - */ -@ConfigurationProperties(prefix = "pillarbox.monitoring.dispatch") -data class SseClientConfigurationProperties( - val uri: String = "http://localhost:8080", - @NestedConfigurationProperty - val retry: RetryProperties = RetryProperties(), -) diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/model/EventRequest.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/model/EventRequest.kt index c564fce..73b6d28 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/model/EventRequest.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/model/EventRequest.kt @@ -15,6 +15,7 @@ import org.springframework.data.elasticsearch.annotations.DateFormat import org.springframework.data.elasticsearch.annotations.Document import org.springframework.data.elasticsearch.annotations.Field import org.springframework.data.elasticsearch.annotations.FieldType +import org.springframework.data.elasticsearch.annotations.WriteTypeHint /** * Represents an event request stored in the OpenSearch `actions` index. @@ -27,7 +28,12 @@ import org.springframework.data.elasticsearch.annotations.FieldType * @property data Additional data associated with the event. * @property session Session data associated with the event, potentially updated later. */ -@Document(indexName = "events", createIndex = false) +@Document( + indexName = "events", + createIndex = false, + writeTypeHint = WriteTypeHint.FALSE, + storeIdInSource = false, +) data class EventRequest( @Id @JsonIgnore diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/ActionRepository.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/ActionRepository.kt deleted file mode 100644 index 9d4475b..0000000 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/ActionRepository.kt +++ /dev/null @@ -1,33 +0,0 @@ -package ch.srgssr.pillarbox.monitoring.event.repository - -import ch.srgssr.pillarbox.monitoring.event.model.EventRequest -import org.springframework.data.elasticsearch.repository.ElasticsearchRepository -import org.springframework.stereotype.Repository - -/** - * Repository interface for managing [EventRequest] entities in an OpenSearch store. - */ -@Repository -interface ActionRepository : ElasticsearchRepository { - /** - * Finds all [EventRequest] entities with the given [sessionId] that do not yet have session data. - * - * @param sessionId The session ID to filter the [EventRequest] entities. - * - * @return A list of [EventRequest] entities that have the specified [sessionId] and no session data. - */ - fun findAllBySessionIdAndSessionIsNull(sessionId: String): List - - /** - * Finds a single [EventRequest] entity by its [sessionId] and [eventName]. - * - * @param sessionId The session ID of the [EventRequest]. - * @param eventName The name of the event. - * - * @return The [EventRequest] entity that matches the given [sessionId] and [eventName], or `null` if none found. - */ - fun findBySessionIdAndEventName( - sessionId: String, - eventName: String, - ): EventRequest? -} diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/EventRepository.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/EventRepository.kt new file mode 100644 index 0000000..704248d --- /dev/null +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/EventRepository.kt @@ -0,0 +1,11 @@ +package ch.srgssr.pillarbox.monitoring.event.repository + +import ch.srgssr.pillarbox.monitoring.event.model.EventRequest +import org.springframework.data.elasticsearch.repository.ElasticsearchRepository +import org.springframework.stereotype.Repository + +/** + * Repository interface for managing [EventRequest] entities in an OpenSearch store. + */ +@Repository +interface EventRepository : ElasticsearchRepository diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/OpenSearchConfiguration.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/OpenSearchConfiguration.kt index f378031..d3d23bb 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/OpenSearchConfiguration.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/OpenSearchConfiguration.kt @@ -6,6 +6,7 @@ import org.opensearch.data.client.orhlc.ClientConfiguration import org.opensearch.data.client.orhlc.RestClients import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +import java.time.Duration /** * Configuration class for setting up the OpenSearch client. @@ -34,6 +35,7 @@ class OpenSearchConfiguration( .apply { if (properties.isHttps) { usingSsl() + withSocketTimeout(Duration.ofSeconds(10)) } }.build() diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/OpenSearchConfigurationProperties.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/OpenSearchConfigurationProperties.kt index 5b487d3..b3b46fc 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/OpenSearchConfigurationProperties.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/OpenSearchConfigurationProperties.kt @@ -1,6 +1,6 @@ package ch.srgssr.pillarbox.monitoring.event.repository -import ch.srgssr.pillarbox.monitoring.common.RetryProperties +import ch.srgssr.pillarbox.monitoring.event.config.RetryProperties import org.springframework.boot.context.properties.ConfigurationProperties import org.springframework.boot.context.properties.NestedConfigurationProperty import java.net.URI diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/AliasSetupTask.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/AliasSetupTask.kt index fa41ade..8cfe617 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/AliasSetupTask.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/AliasSetupTask.kt @@ -2,6 +2,7 @@ package ch.srgssr.pillarbox.monitoring.event.setup import ch.srgssr.pillarbox.monitoring.io.loadResourceContent import ch.srgssr.pillarbox.monitoring.log.error +import ch.srgssr.pillarbox.monitoring.log.info import ch.srgssr.pillarbox.monitoring.log.logger import org.springframework.beans.factory.annotation.Qualifier import org.springframework.core.annotation.Order @@ -66,10 +67,10 @@ class AliasSetupTask( .uri(ALIAS_CHECK_PATH) .retrieve() .onStatus(HttpStatusCode::is4xxClientError) { - logger.info("Alias '$ALIAS_NAME' does not exist, creating alias...") + logger.info { "Alias '$ALIAS_NAME' does not exist, creating alias..." } createAlias().then(Mono.empty()) }.onStatus(HttpStatusCode::is2xxSuccessful) { - logger.info("Alias '$ALIAS_NAME' already exists, skipping creation.") + logger.info { "Alias '$ALIAS_NAME' already exists, skipping creation." } Mono.empty() }.toBodilessEntity() @@ -82,7 +83,7 @@ class AliasSetupTask( .bodyValue(indexTemplateJson) .retrieve() .toBodilessEntity() - .doOnSuccess { logger.info("Alias ${ALIAS_NAME} created successfully") } + .doOnSuccess { logger.info { "Alias ${ALIAS_NAME} created successfully" } } .doOnError { e -> logger.error { "Failed to create alias: ${e.message}" } } } } diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupService.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupService.kt index 171fb2a..2ab79b0 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupService.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupService.kt @@ -1,6 +1,7 @@ package ch.srgssr.pillarbox.monitoring.event.setup import ch.srgssr.pillarbox.monitoring.event.repository.OpenSearchConfigurationProperties +import ch.srgssr.pillarbox.monitoring.log.info import ch.srgssr.pillarbox.monitoring.log.logger import org.springframework.beans.factory.annotation.Qualifier import org.springframework.stereotype.Service @@ -41,7 +42,7 @@ class OpenSearchSetupService( * the retry settings defined in [properties]. If retries are exhausted, the * application will be terminated. */ - fun start() = + fun start(): Mono<*> = checkOpenSearchHealth() .retryWhen( properties.retry.create().doBeforeRetry { @@ -49,7 +50,6 @@ class OpenSearchSetupService( }, ).doOnSuccess { logger.info("OpenSearch is healthy, proceeding with setup...") } .then(runSetupTasks()) - .doOnSuccess { logger.info("All setup tasks are completed, starting SSE client...") } private fun checkOpenSearchHealth(): Mono<*> = webClient @@ -62,7 +62,7 @@ class OpenSearchSetupService( Flux .fromIterable(tasks) .concatMap { task -> - logger.info("Running setup task: ${task::class.simpleName}") + logger.info { "Running setup task: ${task::class.simpleName}" } task.run() }.last() } diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/flow/FlowUtils.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/flow/FlowUtils.kt new file mode 100644 index 0000000..0ee0a2d --- /dev/null +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/flow/FlowUtils.kt @@ -0,0 +1,41 @@ +package ch.srgssr.pillarbox.monitoring.flow + +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow + +/** + * Extension function for [Flow] that groups emitted elements into lists of a specified size. + * + * This function collects elements from the source flow and emits them as chunks of the given size. + * If the number of elements collected is not evenly divisible by the chunk size, the last chunk + * will contain the remaining elements. + * + * @param T The type of elements in the flow. + * @param size The number of elements to include in each chunk. Must be greater than or equal to 1. + * + * @return A new [Flow] that emits lists of elements, each containing at most [size] elements. + * + * @throws IllegalArgumentException If [size] is less than 1. + * + * Example usage: + * ```kotlin + * val numbers = flowOf(1, 2, 3, 4, 5) + * val chunkedFlow = numbers.chunked(2) + * chunkedFlow.collect { println(it) } // Outputs: [1, 2], [3, 4], [5] + * ``` + */ +fun Flow.chunked(size: Int): Flow> { + require(size >= 1) { "Expected positive chunk size, but got $size" } + return flow { + var result: ArrayList? = null + collect { value -> + val acc = result ?: ArrayList(size).also { result = it } + acc.add(value) + if (acc.size == size) { + emit(acc) + result = null + } + } + result?.let { emit(it) } + } +} diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/health/BenchmarkHealthIndicator.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/health/BenchmarkHealthIndicator.kt deleted file mode 100644 index b51dc3d..0000000 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/health/BenchmarkHealthIndicator.kt +++ /dev/null @@ -1,29 +0,0 @@ -package ch.srgssr.pillarbox.monitoring.health - -import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator -import org.springframework.boot.actuate.health.Health -import org.springframework.boot.actuate.health.HealthIndicator -import org.springframework.stereotype.Component - -/** - * A custom health indicator that reports the health status based on the benchmarking data - * collected by the [BenchmarkingAspect]. This health indicator is conditionally enabled - * based on the presence of the `benchmark` health indicator. - * - * @property benchmarkingAspect The aspect that collects and maintains the moving average - * of execution times for methods annotated with [Benchmarked]. - */ -@Component -@ConditionalOnEnabledHealthIndicator("benchmark") -class BenchmarkHealthIndicator( - private val benchmarkingAspect: BenchmarkingAspect, -) : HealthIndicator { - /** - * Reports the health status of the application. The health status is always `UP`, and - * it includes a detailed map of average execution times for all methods monitored by - * the [BenchmarkingAspect]. - * - * @return A [Health] object representing the current health status and details. - */ - override fun health(): Health = Health.up().withDetail("averages", benchmarkingAspect.averages).build() -} diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/health/Benchmarked.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/health/Benchmarked.kt deleted file mode 100644 index 90456ee..0000000 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/health/Benchmarked.kt +++ /dev/null @@ -1,20 +0,0 @@ -package ch.srgssr.pillarbox.monitoring.health - -/** - * Annotation used to mark a method for benchmarking. When a method is annotated with [Benchmarked], - * its execution time will be monitored, logged, and included in a moving average calculation by the - * [BenchmarkingAspect]. - * - * This annotation can only be applied to functions and is retained at runtime. - * - * Usage: - * ``` - * @Benchmarked - * fun someMethod() { - * // Method implementation - * } - * ``` - */ -@Target(AnnotationTarget.FUNCTION) -@Retention(AnnotationRetention.RUNTIME) -annotation class Benchmarked diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/health/BenchmarkingAspect.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/health/BenchmarkingAspect.kt deleted file mode 100644 index 67d521b..0000000 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/health/BenchmarkingAspect.kt +++ /dev/null @@ -1,60 +0,0 @@ -package ch.srgssr.pillarbox.monitoring.health - -import ch.srgssr.pillarbox.monitoring.log.debug -import ch.srgssr.pillarbox.monitoring.log.logger -import org.aspectj.lang.ProceedingJoinPoint -import org.aspectj.lang.annotation.Around -import org.aspectj.lang.annotation.Aspect -import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator -import org.springframework.stereotype.Component -import java.util.concurrent.ConcurrentHashMap -import kotlin.time.measureTimedValue - -/** - * Aspect for benchmarking the execution time of methods annotated with [Benchmarked]. - * This aspect logs the execution time of the methods and maintains a moving average - * of the execution times. - * - * The aspect is conditionally enabled based on the presence of the `benchmark` health indicator. - */ -@Aspect -@Component -@ConditionalOnEnabledHealthIndicator("benchmark") -class BenchmarkingAspect { - private companion object { - /** - * Logger instance for logging within this Aspect. - */ - private val logger = logger() - } - - private val movingAverages = ConcurrentHashMap() - - /** - * Around advice that logs the execution time of methods annotated with [Benchmarked]. - * The execution time is measured, logged, and added to a moving average calculator. - * - * @param joinPoint The join point representing the method execution. - * - * @return The result of the method execution. - * @throws Throwable if the method execution throws an exception. - */ - @Around("@annotation(Benchmarked)") - fun logExecutionTime(joinPoint: ProceedingJoinPoint): Any? { - val (result, executionTime) = measureTimedValue { joinPoint.proceed() } - val signature = joinPoint.signature.toShortString() - val calculator = movingAverages.computeIfAbsent(signature) { MovingAverageCalculator(10) } - - calculator.add(executionTime.inWholeMilliseconds) - logger.debug { ("$signature took $executionTime") } - - return result - } - - /** - * Provides the average execution times for all monitored methods. - * - * @return A map where the keys are method signatures and the values are the average execution times. - */ - val averages get() = movingAverages.mapValues { it.value.average } -} diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/health/MovingAverageCalculator.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/health/MovingAverageCalculator.kt deleted file mode 100644 index 3476f57..0000000 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/health/MovingAverageCalculator.kt +++ /dev/null @@ -1,39 +0,0 @@ -package ch.srgssr.pillarbox.monitoring.health - -import java.util.concurrent.ConcurrentLinkedDeque - -/** - * A class that calculates the moving average of a stream of long values. The calculation is based on a - * fixed-size window of the most recent values. - * - * @property windowSize The size of the window used to calculate the moving average. This defines the number of most - * recent values considered for the average calculation. - */ -internal class MovingAverageCalculator( - private val windowSize: Int, -) { - private val deque = ConcurrentLinkedDeque() - - /** - * Adds a new value to the moving average calculation. If the deque already contains the maximum number - * of elements specified by [windowSize], the oldest element is removed to make room for the new one. - * - * This method is synchronized to ensure thread safety, as the deque operations may be accessed by multiple - * threads concurrently. - * - * @param n The value to be added to the moving average calculation. - */ - @Synchronized - fun add(n: Long) { - deque.takeIf { it.size > windowSize }?.removeLast() - deque.addFirst(n) - } - - /** - * Calculates the moving average of the values currently in the deque. The average is computed over the - * values in the deque, which contains at most [windowSize] elements. - * - * @return The average of the values currently stored in the deque. If the deque is empty, the result will be `NaN`. - */ - val average get() = deque.average() -} diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/log/LoggingUtils.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/log/LoggingExtensions.kt similarity index 79% rename from src/main/kotlin/ch/srgssr/pillarbox/monitoring/log/LoggingUtils.kt rename to src/main/kotlin/ch/srgssr/pillarbox/monitoring/log/LoggingExtensions.kt index 545623f..6f6d2db 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/log/LoggingUtils.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/log/LoggingExtensions.kt @@ -11,7 +11,14 @@ import org.slf4j.LoggerFactory inline fun T.logger(): Logger = LoggerFactory.getLogger(T::class.java) /** - * Logs a debug message if the debug letvel is enabled for the logger. + * Logs a trace message if the trace level is enabled for the logger. + * + * @param lazyMessage A lambda function that generates the log message only if trace logging is enabled. + */ +inline fun Logger.trace(lazyMessage: () -> String) = isTraceEnabled.takeIf { it }?.let { trace(lazyMessage()) } + +/** + * Logs a debug message if the debug level is enabled for the logger. * * @param lazyMessage A lambda function that generates the log message only if debug logging is enabled. */ diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/opensearch/ElasticsearchRepositoryExtensions.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/opensearch/ElasticsearchRepositoryExtensions.kt new file mode 100644 index 0000000..91c4cc5 --- /dev/null +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/opensearch/ElasticsearchRepositoryExtensions.kt @@ -0,0 +1,26 @@ +package ch.srgssr.pillarbox.monitoring.opensearch + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext +import org.springframework.data.elasticsearch.repository.ElasticsearchRepository +import kotlin.coroutines.CoroutineContext + +/** + * Extension function for [ElasticsearchRepository] to provide a coroutine-based API + * for saving multiple entities asynchronously. + * + * @param T The type of the entity managed by the repository. + * @param ID The type of the entity's ID. + * @param S A subtype of [T] representing the entities to be saved. + * @param entities The collection of entities to be saved. + * @param context The context in which the coroutine will be executed. [Dispatchers.IO] by default. + * + * @return A list of the saved entities. + */ +suspend fun ElasticsearchRepository.saveAllSuspend( + entities: Iterable, + context: CoroutineContext = Dispatchers.IO, +): List = + withContext(context) { + saveAll(entities).toList() + } diff --git a/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/src/main/resources/META-INF/additional-spring-configuration-metadata.json deleted file mode 100644 index c66d4b4..0000000 --- a/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ /dev/null @@ -1,10 +0,0 @@ -{ - "properties": [ - { - "name": "management.health.benchmark.enabled", - "type": "java.lang.Boolean", - "description": "Enable or disable benchmark health indicator.", - "defaultValue": false - } - ] -} diff --git a/src/main/resources/application-local.yml b/src/main/resources/application-local.yml new file mode 100644 index 0000000..88d4d2b --- /dev/null +++ b/src/main/resources/application-local.yml @@ -0,0 +1,4 @@ +pillarbox.monitoring.dispatch: + buffer-capacity: 1_000 + cache-size: 5_000 + save-chunk-size: 1 diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml new file mode 100644 index 0000000..e289687 --- /dev/null +++ b/src/main/resources/application-prod.yml @@ -0,0 +1,4 @@ +pillarbox.monitoring.dispatch: + buffer-capacity: 30_000 + cache-size: 200_000 + save-chunk-size: 6_000 diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index fc84f5c..a940887 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,23 +1,10 @@ spring: + main.web-application-type: none threads.virtual.enabled: true application.name: pillarbox-monitoring-transfer jackson.deserialization: - fail-on-null-for-primitives: true - + fail-on-null-for-primitives: true pillarbox.monitoring: dispatch.uri: "http://localhost:8080/events" opensearch.uri: "http://localhost:9200" - -server: - port: 8081 - http2.enabled: true - compression.enabled: true - -management: - endpoints.web.exposure.include: health - endpoint.health.show-details: always - health.benchmark.enabled: true - -logging.level: - ch.srgssr.pillarbox.monitoring: INFO diff --git a/src/main/resources/opensearch/index.json b/src/main/resources/opensearch/index.json index 9c13b5d..a46387b 100644 --- a/src/main/resources/opensearch/index.json +++ b/src/main/resources/opensearch/index.json @@ -5,3 +5,40 @@ } } } + +{ + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": 1737719669146, + "lte": 1737721469146, + "format": "epoch_millis" + } + } + }, + { + "query_string": { + "analyze_wildcard": true, + "query": "event_name:START" + } + } + ] + } + }, + "aggs": { + "2": { + "terms": { + "field": "data.media.id", + "size": 10, + "order": { + "_count": "desc" + }, + "min_doc_count": 1 + }, + "aggs": {} + } + } +} diff --git a/src/main/resources/opensearch/index_template.json b/src/main/resources/opensearch/index_template.json index 3ca7cb9..ab26250 100644 --- a/src/main/resources/opensearch/index_template.json +++ b/src/main/resources/opensearch/index_template.json @@ -1,17 +1,345 @@ { - "index_patterns": ["events*"], + "index_patterns": [ + "events*" + ], "priority": 100, "template": { "settings": { "number_of_shards": 3, "number_of_replicas": 1, + "refresh_interval": "60s", "plugins.index_state_management.rollover_alias": "events" }, "mappings": { + "dynamic": false, "properties": { + "session_id": { + "type": "keyword", + "ignore_above": 256, + "eager_global_ordinals": true + }, + "event_name": { + "type": "keyword", + "ignore_above": 256 + }, + "user_ip": { + "type": "keyword", + "ignore_above": 256 + }, + "version": { + "type": "long" + }, "@timestamp": { "type": "date", "format": "epoch_millis" + }, + "data": { + "properties": { + "airplay": { + "type": "boolean" + }, + "bandwidth": { + "type": "long" + }, + "bitrate": { + "type": "long" + }, + "browser": { + "properties": { + "name": { + "type": "keyword", + "ignore_above": 256 + }, + "user_agent": { + "type": "keyword", + "ignore_above": 1024 + }, + "version": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "buffered_duration": { + "type": "long" + }, + "device": { + "properties": { + "id": { + "type": "keyword", + "ignore_above": 256 + }, + "model": { + "type": "keyword", + "ignore_above": 256 + }, + "type": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "duration": { + "type": "long" + }, + "frame_drops": { + "type": "long" + }, + "log": { + "type": "keyword", + "ignore_above": 256 + }, + "media": { + "properties": { + "asset_url": { + "type": "keyword", + "ignore_above": 2048 + }, + "id": { + "type": "keyword", + "ignore_above": 256, + "eager_global_ordinals": true + }, + "metadata_url": { + "type": "keyword", + "ignore_above": 2048 + }, + "origin": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "message": { + "type": "keyword", + "ignore_above": 256 + }, + "name": { + "type": "keyword", + "ignore_above": 256 + }, + "os": { + "properties": { + "name": { + "type": "keyword", + "ignore_above": 256 + }, + "version": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "playback_duration": { + "type": "long" + }, + "player": { + "properties": { + "name": { + "type": "keyword", + "ignore_above": 256 + }, + "platform": { + "type": "keyword", + "ignore_above": 256 + }, + "version": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "position": { + "type": "long" + }, + "position_timestamp": { + "type": "long" + }, + "qoe_timings": { + "properties": { + "asset": { + "type": "float" + }, + "metadata": { + "type": "float" + }, + "total": { + "type": "float" + } + } + }, + "qos_timings": { + "properties": { + "asset": { + "type": "float" + }, + "metadata": { + "type": "float" + }, + "token": { + "type": "float" + } + } + }, + "robot": { + "type": "boolean" + }, + "screen": { + "properties": { + "height": { + "type": "float" + }, + "width": { + "type": "float" + } + } + }, + "stall": { + "properties": { + "count": { + "type": "long" + }, + "duration": { + "type": "long" + } + } + }, + "stream_type": { + "type": "keyword", + "ignore_above": 256 + }, + "url": { + "type": "keyword", + "ignore_above": 2048 + }, + "vpn": { + "type": "boolean" + } + } + }, + "session": { + "properties": { + "browser": { + "properties": { + "name": { + "type": "keyword", + "ignore_above": 256 + }, + "user_agent": { + "type": "keyword", + "ignore_above": 1024 + }, + "version": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "device": { + "properties": { + "id": { + "type": "keyword", + "ignore_above": 256 + }, + "model": { + "type": "keyword", + "ignore_above": 256 + }, + "type": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "media": { + "properties": { + "asset_url": { + "type": "keyword", + "ignore_above": 2048 + }, + "id": { + "type": "keyword", + "ignore_above": 256, + "eager_global_ordinals": true + }, + "metadata_url": { + "type": "keyword", + "ignore_above": 2048 + }, + "origin": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "os": { + "properties": { + "name": { + "type": "keyword", + "ignore_above": 256 + }, + "version": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "player": { + "properties": { + "name": { + "type": "keyword", + "ignore_above": 256 + }, + "platform": { + "type": "keyword", + "ignore_above": 256 + }, + "version": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "qoe_timings": { + "properties": { + "asset": { + "type": "float" + }, + "metadata": { + "type": "float" + }, + "total": { + "type": "float" + } + } + }, + "qos_timings": { + "properties": { + "asset": { + "type": "float" + }, + "metadata": { + "type": "float" + }, + "token": { + "type": "float" + } + } + }, + "robot": { + "type": "boolean" + }, + "screen": { + "properties": { + "height": { + "type": "float" + }, + "width": { + "type": "float" + } + } + } + } } } } diff --git a/src/main/resources/opensearch/ism_policy.json b/src/main/resources/opensearch/ism_policy.json index 96f8ebb..baebc3f 100644 --- a/src/main/resources/opensearch/ism_policy.json +++ b/src/main/resources/opensearch/ism_policy.json @@ -18,7 +18,7 @@ { "state_name": "warm", "conditions": { - "min_index_age": "7d" + "min_index_age": "3d" } } ] @@ -36,7 +36,7 @@ { "state_name": "delete", "conditions": { - "min_index_age": "14d" + "min_index_age": "7d" } } ] diff --git a/src/test/kotlin/ch/srgssr/pillarbox/monitoring/cache/LRUCacheTest.kt b/src/test/kotlin/ch/srgssr/pillarbox/monitoring/cache/LRUCacheTest.kt new file mode 100644 index 0000000..0919779 --- /dev/null +++ b/src/test/kotlin/ch/srgssr/pillarbox/monitoring/cache/LRUCacheTest.kt @@ -0,0 +1,42 @@ +package ch.srgssr.pillarbox.monitoring.cache + +import io.kotest.core.spec.style.ShouldSpec +import io.kotest.matchers.shouldBe + +class LRUCacheTest : + ShouldSpec({ + should("retrieve values by key") { + val cache = LRUCache(3) + cache.put(1, "one") + cache.put(2, "two") + cache.put(3, "three") + + cache.get(1) shouldBe "one" + cache.get(2) shouldBe "two" + cache.get(3) shouldBe "three" + } + + should("evict least recently used item when capacity is exceeded") { + val cache = LRUCache(3) + cache.put(1, "one") + cache.put(2, "two") + cache.put(3, "three") + + // Access item 1 to update the order + cache.get(1) + cache.put(4, "four") // This should evict key 2 + + cache.get(1) shouldBe "one" + cache.get(3) shouldBe "three" + cache.get(4) shouldBe "four" + cache.get(2) shouldBe null // Key 2 should have been evicted + } + + should("replace the value if the key already exists") { + val cache = LRUCache(3) + cache.put(1, "one") + cache.put(1, "uno") // Update the value for key 1 + + cache.get(1) shouldBe "uno" + } + }) diff --git a/src/test/kotlin/ch/srgssr/pillarbox/monitoring/event/model/EventRequestTest.kt b/src/test/kotlin/ch/srgssr/pillarbox/monitoring/event/model/EventRequestTest.kt index c111d84..63a8e1a 100644 --- a/src/test/kotlin/ch/srgssr/pillarbox/monitoring/event/model/EventRequestTest.kt +++ b/src/test/kotlin/ch/srgssr/pillarbox/monitoring/event/model/EventRequestTest.kt @@ -7,8 +7,10 @@ import io.kotest.assertions.throwables.shouldThrow import io.kotest.core.spec.style.ShouldSpec import io.kotest.matchers.shouldBe import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.ActiveProfiles @SpringBootTest +@ActiveProfiles("test") class EventRequestTest( private val objectMapper: ObjectMapper, ) : ShouldSpec({