From b00e85bbbdd0f24913d8194893f84aaf1d49828a Mon Sep 17 00:00:00 2001 From: Josep Boix Requesens Date: Thu, 30 Jan 2025 14:35:24 +0100 Subject: [PATCH] fix: clamp out-of-range long values to prevent indexing errors Some clients send extremely large or small numeric values that Jackson deserializes as `BigInteger`. This causes a parsing error since OpenSearch expects them to be in the long range. To prevent indexing failures, this commit introduces a `ClampingLongConverter` that converts out-of-range `BigInteger` values to `Long.MAX_VALUE` or `Long.MIN_VALUE`, ensuring compatibility while preserving meaningful data. --- .../monitoring/event/EventDispatcherClient.kt | 5 ++-- .../event/repository/ClampingLongConverter.kt | 30 +++++++++++++++++++ .../repository/OpenSearchConfiguration.kt | 16 +++++++++- .../OpenSearchConfigurationProperties.kt | 2 ++ src/main/resources/application-local.yml | 3 ++ src/main/resources/application.yml | 4 --- .../json/ClampingLongConverterTest.kt | 27 +++++++++++++++++ 7 files changed, 80 insertions(+), 7 deletions(-) create mode 100644 src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/ClampingLongConverter.kt create mode 100644 src/test/kotlin/ch/srgssr/pillarbox/monitoring/json/ClampingLongConverterTest.kt diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClient.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClient.kt index ac5f5f8..4ffc6e5 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClient.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/EventDispatcherClient.kt @@ -34,6 +34,7 @@ import org.springframework.web.reactive.function.client.bodyToFlow class EventDispatcherClient( private val eventRepository: EventRepository, private val properties: EventDispatcherClientConfiguration, + webClientBuilder: WebClient.Builder, ) { private companion object { /** @@ -43,14 +44,14 @@ class EventDispatcherClient( } private val sessionCache: LRUCache = LRUCache(properties.cacheSize) + private val webClient = webClientBuilder.baseUrl(properties.uri).build() /**a * Starts the SSE client, connecting to the configured SSE endpoint. It handles incoming events by * delegating to the appropriate event handling methods and manages retries in case of connection failures. */ fun start(): Job = - WebClient - .create(properties.uri) + webClient .get() .retrieve() .bodyToFlow() diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/ClampingLongConverter.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/ClampingLongConverter.kt new file mode 100644 index 0000000..b17918b --- /dev/null +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/ClampingLongConverter.kt @@ -0,0 +1,30 @@ +package ch.srgssr.pillarbox.monitoring.event.repository + +import org.springframework.core.convert.converter.Converter +import java.math.BigInteger + +/** + * A converter that transforms a [BigInteger] into a [Long], ensuring that values + * outside the range of [Long.MIN_VALUE] to [Long.MAX_VALUE] are clamped. + */ +class ClampingLongConverter : Converter { + companion object { + private val MAX_LONG_AS_BIGINT = BigInteger.valueOf(Long.MAX_VALUE) + private val MIN_LONG_AS_BIGINT = BigInteger.valueOf(Long.MIN_VALUE) + } + + /** + * Converts a given [BigInteger] to a [Long], clamping values that exceed the range of a `Long` type. + * + * @param value The [BigInteger] to convert. + * + * @return The equivalent [Long] value, clamped to [Long.MIN_VALUE] or [Long.MAX_VALUE] + * if the input exceeds the representable range. + */ + override fun convert(value: BigInteger): Long = + when { + value > MAX_LONG_AS_BIGINT -> Long.MAX_VALUE + value < MIN_LONG_AS_BIGINT -> Long.MIN_VALUE + else -> value.toLong() + } +} diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/OpenSearchConfiguration.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/OpenSearchConfiguration.kt index d3d23bb..312b2fd 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/OpenSearchConfiguration.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/OpenSearchConfiguration.kt @@ -6,6 +6,7 @@ import org.opensearch.data.client.orhlc.ClientConfiguration import org.opensearch.data.client.orhlc.RestClients import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +import org.springframework.data.elasticsearch.core.convert.ElasticsearchCustomConversions import java.time.Duration /** @@ -35,10 +36,23 @@ class OpenSearchConfiguration( .apply { if (properties.isHttps) { usingSsl() - withSocketTimeout(Duration.ofSeconds(10)) + withSocketTimeout(Duration.ofMillis(properties.timeout)) } }.build() return RestClients.create(clientConfiguration).rest() } + + /** + * Registers custom conversions for OpenSearch. + * + * @return An instance of [ElasticsearchCustomConversions] containing the custom converters. + * + * @see [ClampingLongConverter] + */ + @Bean + override fun elasticsearchCustomConversions(): ElasticsearchCustomConversions = + ElasticsearchCustomConversions( + listOf(ClampingLongConverter()), + ) } diff --git a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/OpenSearchConfigurationProperties.kt b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/OpenSearchConfigurationProperties.kt index b3b46fc..ec1db00 100644 --- a/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/OpenSearchConfigurationProperties.kt +++ b/src/main/kotlin/ch/srgssr/pillarbox/monitoring/event/repository/OpenSearchConfigurationProperties.kt @@ -12,12 +12,14 @@ import java.net.URI * * @property uri The URI of the OpenSearch server. Defaults to `http://localhost:9200`. * @property retry Nested configuration properties for retry settings related to OpenSearch operations. + * @property timeout The default timeout for each connection in milliseconds. 10s by default. */ @ConfigurationProperties(prefix = "pillarbox.monitoring.opensearch") data class OpenSearchConfigurationProperties( val uri: URI = URI("http://localhost:9200"), @NestedConfigurationProperty val retry: RetryProperties = RetryProperties(), + val timeout: Long = 10_000, ) { /** * Retrieves the host and port in the format `host:port` based on the URI. diff --git a/src/main/resources/application-local.yml b/src/main/resources/application-local.yml index 88d4d2b..e075040 100644 --- a/src/main/resources/application-local.yml +++ b/src/main/resources/application-local.yml @@ -2,3 +2,6 @@ pillarbox.monitoring.dispatch: buffer-capacity: 1_000 cache-size: 5_000 save-chunk-size: 1 + +logging.level: + ch.srgssr.pillarbox.monitoring: TRACE diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index a940887..07cf7fb 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -4,7 +4,3 @@ spring: application.name: pillarbox-monitoring-transfer jackson.deserialization: fail-on-null-for-primitives: true - -pillarbox.monitoring: - dispatch.uri: "http://localhost:8080/events" - opensearch.uri: "http://localhost:9200" diff --git a/src/test/kotlin/ch/srgssr/pillarbox/monitoring/json/ClampingLongConverterTest.kt b/src/test/kotlin/ch/srgssr/pillarbox/monitoring/json/ClampingLongConverterTest.kt new file mode 100644 index 0000000..3cb6a84 --- /dev/null +++ b/src/test/kotlin/ch/srgssr/pillarbox/monitoring/json/ClampingLongConverterTest.kt @@ -0,0 +1,27 @@ +package ch.srgssr.pillarbox.monitoring.json + +import ch.srgssr.pillarbox.monitoring.event.repository.ClampingLongConverter +import io.kotest.core.spec.style.ShouldSpec +import io.kotest.matchers.shouldBe +import java.math.BigInteger + +class ClampingLongConverterTest : + ShouldSpec({ + val converter = ClampingLongConverter() + should("serialize BigInteger within Long range correctly") { + val result = converter.convert(BigInteger.valueOf(123456789L)) + result shouldBe 123456789 + } + + should("clamp BigInteger larger than Long.MAX_VALUE to Long.MAX_VALUE") { + val bigIntAboveMax = BigInteger.valueOf(Long.MAX_VALUE).add(BigInteger.ONE) + val result = converter.convert(bigIntAboveMax) + result shouldBe Long.MAX_VALUE + } + + should("clamp BigInteger smaller than Long.MIN_VALUE to Long.MIN_VALUE") { + val bigIntBelowMin = BigInteger.valueOf(Long.MIN_VALUE).subtract(BigInteger.ONE) + val result = converter.convert(bigIntBelowMin) + result shouldBe Long.MIN_VALUE + } + })