Skip to content

Commit

Permalink
feat: consolidate event processing and optimize application performance
Browse files Browse the repository at this point in the history
The event processing system has been updated to improve readability and efficiency:

- Switched from Reactor to Kotlin Flow for event handling to improve readability.
- Events are now saved in batch, reducing concurrency and improving performance.
- Removed reconciliation logic with a more aggressive in memory cache approach.
- Replaced Caffeine with a simple LinkedHashMap cache to support LRU eviction policy.
- Added configuration parameters to adjust the size of the cache, buffer and batch chunk size. These
  properties can also be modified with environment variables.
- Removed actuator: benchmarks are now log, reducing memory consumption and eliminating the need
  for starting a server.
- Updated dependencies to the latest compatible versions.
- Replaced dynamic mapping in OpenSearch with a handmade mapping for document fields. This change
  reduces storage requirements, and eliminates unnecessary fields, making queries more logical and
  efficient.

Co-authored-by: Gaëtan Muller <[email protected]>
  • Loading branch information
jboix and MGaetan89 committed Jan 29, 2025
1 parent b2e20f0 commit 4b202ca
Show file tree
Hide file tree
Showing 41 changed files with 961 additions and 498 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ COPY --from=build /app/build/libs/app.jar app.jar
RUN java -Djarmode=layertools -jar app.jar extract

# JVM memory and garbage collection optimizations for containers
ENTRYPOINT ["java", "-Dsun.net.inetaddr.ttl=60", "-Dsun.net.inetaddr.negative.ttl=10", "-Xms512m", "-Xmx1024m", "-XX:+UseG1GC", "-XX:MaxGCPauseMillis=200", "-XX:+UseContainerSupport", "-jar", "/app.jar"]
ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -Dsun.net.inetaddr.ttl=60 -Dsun.net.inetaddr.negative.ttl=10 -XX:+UseContainerSupport --add-opens java.base/java.math=ALL-UNNAMED -jar /app.jar"]
26 changes: 6 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ Alternatively, you can build and run the application using Docker:
This project is a Kotlin-based Spring Boot application designed to connect to a Server-Sent Events (
SSE) endpoint, process incoming events, and store data in an OpenSearch index.

It leverages Spring Boot's reactive WebFlux framework and integrates custom health indicators using
Spring Boot Actuator to provide insights into application performance.

### System Flow Overview

The main loop of this service is illustrated in the following diagram:
Expand All @@ -72,15 +69,13 @@ The main loop of this service is illustrated in the following diagram:
sequenceDiagram
participant DataTransfer
participant SSEEndpoint
participant LockManager
participant OpenSearch
DataTransfer ->> OpenSearch: Initialize index
DataTransfer ->> SSEEndpoint: Connects and listens to events
loop Listen for events
SSEEndpoint --) DataTransfer: Send Event
DataTransfer ->> LockManager: Acquire session lock
DataTransfer ->> OpenSearch: Store event
DataTransfer -->> LockManager: Release session lock
end
```

Expand All @@ -91,14 +86,10 @@ system:

- [PillarboxDataTransferApplication.kt][main-entry-point]: The main entry point of the application
that bootstraps and configures the service.
- [BenchmarkHealthIndicator.kt][health-indicator]: Monitors the performance of key operations,
providing real-time health metrics for the application.
- [LockManager.kt][lock-manager]: Ensures concurrency control by managing locks for different
sessions, enabling thread-safe operations.
- [SetupService.kt][setup-service]: Manages the initial setup of the OpenSearch index and the
application’s configuration for SSE processing.
- [SseClient.kt][sse-client]: Listens to the SSE endpoint, handling incoming events and managing
retries in case of connection failures.
- [OpenSearchSetupService.kt][setup-service]: Manages the initial setup of the OpenSearch index and
the application’s configuration for SSE processing.
- [EventDispatcherClient.kt][sse-client]: Listens to the SSE endpoint, handling incoming events and
managing retries in case of connection failures.

Here’s a more concise description of the GitHub Actions setup without listing the steps:

Expand Down Expand Up @@ -162,11 +153,6 @@ Refer to our [Contribution Guide](docs/CONTRIBUTING.md) for more detailed inform
This project is licensed under the [MIT License](LICENSE).

[main-entry-point]: src/main/kotlin/ch/srgssr/pillarbox/monitoring/PillarboxDataTransferApplication.kt
[setup-service]: src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/setup/OpenSearchSetupService.kt
[sse-client]: src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClient.kt

[health-indicator]: src/main/kotlin/ch/srgssr/pillarbox/monitoring/health/BenchmarkHealthIndicator.kt

[lock-manager]: src/main/kotlin/ch/srgssr/pillarbox/monitoring/concurrent/LockManager.kt

[setup-service]: src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/SetupService.kt

[sse-client]: src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/SseClient.kt
23 changes: 12 additions & 11 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import java.util.Properties
plugins {
kotlin("jvm") version "2.0.10"
kotlin("plugin.spring") version "2.0.10"
id("org.springframework.boot") version "3.3.4"
id("io.spring.dependency-management") version "1.1.6"
id("org.springframework.boot") version "3.4.1"
id("io.spring.dependency-management") version "1.1.7"
id("io.gitlab.arturbosch.detekt") version "1.23.7"
id("org.jlleitschuh.gradle.ktlint") version "12.1.1"
id("org.jetbrains.kotlinx.kover") version "0.8.3"
id("org.jlleitschuh.gradle.ktlint") version "12.1.2"
id("org.jetbrains.kotlinx.kover") version "0.9.0"
id("com.github.ben-manes.versions") version "0.51.0"
}

group = "ch.srgssr.pillarbox"
Expand All @@ -24,23 +25,23 @@ repositories {
}

dependencies {
implementation("org.springframework.boot:spring-boot-starter-aop")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.opensearch.client:spring-data-opensearch-starter:1.5.3")
// Dependencies
implementation("org.opensearch.client:spring-data-opensearch-starter:1.6.0")
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("nl.basjes.parse.useragent:yauaa:7.29.0")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
implementation("com.github.ben-manes.caffeine:caffeine")
implementation("nl.basjes.parse.useragent:yauaa:7.28.1")

// Test Dependencies
testImplementation("io.kotest:kotest-runner-junit5:5.9.1")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("io.kotest.extensions:kotest-extensions-spring:1.3.0")
testImplementation("org.jetbrains.kotlin:kotlin-test-junit5")
testImplementation("io.mockk:mockk:1.13.13")
testImplementation("com.squareup.okhttp3:mockwebserver")
testImplementation("com.squareup.okhttp3:okhttp")
testImplementation("com.squareup.okhttp3:mockwebserver:4.12.0")
testImplementation("com.squareup.okhttp3:okhttp:4.12.0")
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package ch.srgssr.pillarbox.monitoring

import ch.srgssr.pillarbox.monitoring.event.EventDispatcherClient
import ch.srgssr.pillarbox.monitoring.event.setup.OpenSearchSetupService
import ch.srgssr.pillarbox.monitoring.exception.RetryExhaustedException
import ch.srgssr.pillarbox.monitoring.log.logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.runBlocking
import org.springframework.boot.ApplicationArguments
import org.springframework.boot.ApplicationRunner
import org.springframework.context.annotation.Profile
Expand Down Expand Up @@ -43,33 +45,22 @@ class DataTransferApplicationRunner(
*
* @param args Application arguments.
*/
override fun run(args: ApplicationArguments?) {
openSearchSetupService.start().subscribe(
// Success
{ this.startSseClient() },
// Error
{
logger.error("Failed to connect to OpenSearch:", it)
CoroutineScope(Dispatchers.IO).launch {
terminationService.terminateApplication()
}
},
)
}
override fun run(args: ApplicationArguments?) =
runBlocking {
try {
openSearchSetupService
.start()
.asFlow()
.catch {
logger.error("Failed to connect to OpenSearch:", it)
throw it
}.launchIn(CoroutineScope(Dispatchers.Default))
.join()

private fun startSseClient() {
eventDispatcherClient.start().subscribe(
// Success
{ },
// Error
{
if (it is RetryExhaustedException) {
logger.error("Failed to connect to SSE after retries, terminating application.", it)
CoroutineScope(Dispatchers.IO).launch {
terminationService.terminateApplication()
}
}
},
)
}
logger.info("All setup tasks are completed, starting SSE client...")
eventDispatcherClient.start().join()
} finally {
terminationService.terminateApplication()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchDataAutoConfiguration
import org.springframework.boot.context.properties.ConfigurationPropertiesScan
import org.springframework.boot.runApplication
import org.springframework.scheduling.annotation.EnableScheduling

/**
* Main entry point for the Pillarbox QoS Data Transfer application.
*/
@SpringBootApplication(exclude = [ElasticsearchDataAutoConfiguration::class])
@ConfigurationPropertiesScan
@EnableScheduling
class PillarboxDataTransferApplication

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package ch.srgssr.pillarbox.monitoring.benchmark

import ch.srgssr.pillarbox.monitoring.log.info
import ch.srgssr.pillarbox.monitoring.log.logger
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import java.util.concurrent.TimeUnit

/**
* A scheduled logger that periodically logs the average execution times
* and stats regarding the number of processed events.
*/
@Component
class BenchmarkScheduledLogger {
private companion object {
/**
* Logger instance for logging within this component.
*/
private val logger = logger()
}

/**
* The scheduled logging function, executes every minute.
*/
@Scheduled(fixedRate = 1, timeUnit = TimeUnit.MINUTES)
fun logBenchmarkAverages() {
logger.info { "Benchmark averages: ${TimeTracker.averages}" }
logger.info { "Latest stats per minute: ${StatsTracker.getAndResetAll()}" }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package ch.srgssr.pillarbox.monitoring.benchmark

import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.LongAdder

/**
* A thread-safe class for calculating the moving average of a stream of long values.
*
* This implementation accumulates values until the average is calculated, at which point
* the internal state (sum and count) is reset.
*/
internal class MovingAverageCalculator {
private val sum = LongAdder()
private val count = AtomicLong()

/**
* Adds a new value to the moving average calculation.
*
* @param n The value to be added to the moving average calculation.
*/
fun add(n: Long) {
sum.add(n)
count.incrementAndGet()
}

/**
* Calculates the average of the accumulated values and resets the internal state.
*
* This method atomically retrieves and resets both the accumulated sum and count.
* If no values have been added, the method returns `Double.NaN`.
*
* @return The average of the accumulated values, or `NaN` if no values were added.
*/
val average: Double
get() =
synchronized(this) {
val totalCount = count.getAndSet(0)
val total = sum.sumThenReset().toDouble()
if (totalCount == 0L) Double.NaN else total / totalCount
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package ch.srgssr.pillarbox.monitoring.benchmark

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

/**
* Utility object for tracking and aggregating statistics.
*
* This class uses a [ConcurrentHashMap] to store statistics counters identified by unique keys.
* It provides methods to increment, retrieve, and reset these counters.
*/
object StatsTracker {
private val stats = ConcurrentHashMap<String, AtomicLong>()

/**
* Increments the count for a specific key by the specified delta.
* If the key does not exist, it is initialized to 0 before incrementing.
*
* @param key The unique identifier for the statistic.
* @param delta The amount to increment the counter by. Defaults to 1.
*/
fun increment(
key: String,
delta: Long = 1,
) {
stats.computeIfAbsent(key) { AtomicLong(0) }.addAndGet(delta)
}

/**
* Increments the count for a specific key by the specified delta.
*
* @param key The unique identifier for the statistic.
* @param delta The amount to increment the counter by, as an [Int].
*/
fun increment(
key: String,
delta: Int,
) {
increment(key, delta.toLong())
}

/**
* Retrieves the current count for a specific key.
*
* @param key The unique identifier for the statistic.
* @return The current count for the key, or 0 if the key does not exist.
*/
operator fun get(key: String): Long = stats[key]?.get() ?: 0L

/**
* Retrieves all current statistics as a map and resets their counters to 0.
*
* @return A map containing the keys and their corresponding counts before reset.
*/
fun getAndResetAll(): Map<String, Long> = stats.mapValues { it.value.getAndSet(0) }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package ch.srgssr.pillarbox.monitoring.benchmark

import ch.srgssr.pillarbox.monitoring.log.logger
import ch.srgssr.pillarbox.monitoring.log.trace
import java.util.concurrent.ConcurrentHashMap
import kotlin.time.measureTimedValue

/**
* Utility object for tracking and logging execution times of code blocks with support for moving averages.
*/
object TimeTracker {
private val logger = logger()

private val movingAverages = ConcurrentHashMap<String, MovingAverageCalculator>()

/**
* Tracks the execution time of a code block.
*
* @param T The return type of the code block.
* @param signature A unique identifier (e.g., method name) for tracking and logging purposes.
* @param block The suspending code block to measure.
*
* @return The result of the code block.
*/
suspend fun <T> track(
signature: String,
block: suspend () -> T,
): T {
val (result, executionTime) = measureTimedValue { block() }
val calculator =
movingAverages.computeIfAbsent(signature) {
MovingAverageCalculator()
}

calculator.add(executionTime.inWholeMilliseconds)
logger.trace { "$signature took $executionTime" }

return result
}

/**
* Provides the average execution times for all monitored methods.
*
* @return A map where the keys are method signatures and the values are the average execution times.
*/
val averages get() = movingAverages.mapValues { it.value.average }
}

/**
* Convenience function to track execution time of a code block using [TimeTracker].
*
* @param T The return type of the code block.
* @param signature A unique identifier (e.g., method name) for tracking and logging purposes.
* @param block The suspending code block to measure.
*
* @return The result of the code block.
*/
suspend fun <T> timed(
signature: String,
block: suspend () -> T,
): T = TimeTracker.track(signature, block)
Loading

0 comments on commit 4b202ca

Please sign in to comment.