Skip to content

Commit

Permalink
fix: clamp out-of-range long values to prevent indexing errors
Browse files Browse the repository at this point in the history
Some clients send extremely large or small numeric values that Jackson deserializes as `BigInteger`.
This causes a parsing error since OpenSearch expects them to be in the long range.

To prevent indexing failures, this commit introduces a `ClampingLongConverter` that converts
out-of-range `BigInteger` values to `Long.MAX_VALUE` or `Long.MIN_VALUE`, ensuring compatibility while
preserving meaningful data.
  • Loading branch information
jboix committed Jan 31, 2025
1 parent 143d2e0 commit b00e85b
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.springframework.web.reactive.function.client.bodyToFlow
class EventDispatcherClient(
private val eventRepository: EventRepository,
private val properties: EventDispatcherClientConfiguration,
webClientBuilder: WebClient.Builder,
) {
private companion object {
/**
Expand All @@ -43,14 +44,14 @@ class EventDispatcherClient(
}

private val sessionCache: LRUCache<String, Any> = LRUCache(properties.cacheSize)
private val webClient = webClientBuilder.baseUrl(properties.uri).build()

/**a
* Starts the SSE client, connecting to the configured SSE endpoint. It handles incoming events by
* delegating to the appropriate event handling methods and manages retries in case of connection failures.
*/
fun start(): Job =
WebClient
.create(properties.uri)
webClient
.get()
.retrieve()
.bodyToFlow<EventRequest>()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package ch.srgssr.pillarbox.monitoring.event.repository

import org.springframework.core.convert.converter.Converter
import java.math.BigInteger

/**
* A converter that transforms a [BigInteger] into a [Long], ensuring that values
* outside the range of [Long.MIN_VALUE] to [Long.MAX_VALUE] are clamped.
*/
class ClampingLongConverter : Converter<BigInteger, Long> {
companion object {
private val MAX_LONG_AS_BIGINT = BigInteger.valueOf(Long.MAX_VALUE)
private val MIN_LONG_AS_BIGINT = BigInteger.valueOf(Long.MIN_VALUE)
}

/**
* Converts a given [BigInteger] to a [Long], clamping values that exceed the range of a `Long` type.
*
* @param value The [BigInteger] to convert.
*
* @return The equivalent [Long] value, clamped to [Long.MIN_VALUE] or [Long.MAX_VALUE]
* if the input exceeds the representable range.
*/
override fun convert(value: BigInteger): Long =
when {
value > MAX_LONG_AS_BIGINT -> Long.MAX_VALUE
value < MIN_LONG_AS_BIGINT -> Long.MIN_VALUE
else -> value.toLong()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import org.opensearch.data.client.orhlc.ClientConfiguration
import org.opensearch.data.client.orhlc.RestClients
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.data.elasticsearch.core.convert.ElasticsearchCustomConversions
import java.time.Duration

/**
Expand Down Expand Up @@ -35,10 +36,23 @@ class OpenSearchConfiguration(
.apply {
if (properties.isHttps) {
usingSsl()
withSocketTimeout(Duration.ofSeconds(10))
withSocketTimeout(Duration.ofMillis(properties.timeout))
}
}.build()

return RestClients.create(clientConfiguration).rest()
}

/**
* Registers custom conversions for OpenSearch.
*
* @return An instance of [ElasticsearchCustomConversions] containing the custom converters.
*
* @see [ClampingLongConverter]
*/
@Bean
override fun elasticsearchCustomConversions(): ElasticsearchCustomConversions =
ElasticsearchCustomConversions(
listOf(ClampingLongConverter()),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import java.net.URI
*
* @property uri The URI of the OpenSearch server. Defaults to `http://localhost:9200`.
* @property retry Nested configuration properties for retry settings related to OpenSearch operations.
* @property timeout The default timeout for each connection in milliseconds. 10s by default.
*/
@ConfigurationProperties(prefix = "pillarbox.monitoring.opensearch")
data class OpenSearchConfigurationProperties(
val uri: URI = URI("http://localhost:9200"),
@NestedConfigurationProperty
val retry: RetryProperties = RetryProperties(),
val timeout: Long = 10_000,
) {
/**
* Retrieves the host and port in the format `host:port` based on the URI.
Expand Down
3 changes: 3 additions & 0 deletions src/main/resources/application-local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@ pillarbox.monitoring.dispatch:
buffer-capacity: 1_000
cache-size: 5_000
save-chunk-size: 1

logging.level:
ch.srgssr.pillarbox.monitoring: TRACE
4 changes: 0 additions & 4 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,3 @@ spring:
application.name: pillarbox-monitoring-transfer
jackson.deserialization:
fail-on-null-for-primitives: true

pillarbox.monitoring:
dispatch.uri: "http://localhost:8080/events"
opensearch.uri: "http://localhost:9200"
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package ch.srgssr.pillarbox.monitoring.json

import ch.srgssr.pillarbox.monitoring.event.repository.ClampingLongConverter
import io.kotest.core.spec.style.ShouldSpec
import io.kotest.matchers.shouldBe
import java.math.BigInteger

class ClampingLongConverterTest :
ShouldSpec({
val converter = ClampingLongConverter()
should("serialize BigInteger within Long range correctly") {
val result = converter.convert(BigInteger.valueOf(123456789L))
result shouldBe 123456789
}

should("clamp BigInteger larger than Long.MAX_VALUE to Long.MAX_VALUE") {
val bigIntAboveMax = BigInteger.valueOf(Long.MAX_VALUE).add(BigInteger.ONE)
val result = converter.convert(bigIntAboveMax)
result shouldBe Long.MAX_VALUE
}

should("clamp BigInteger smaller than Long.MIN_VALUE to Long.MIN_VALUE") {
val bigIntBelowMin = BigInteger.valueOf(Long.MIN_VALUE).subtract(BigInteger.ONE)
val result = converter.convert(bigIntBelowMin)
result shouldBe Long.MIN_VALUE
}
})

0 comments on commit b00e85b

Please sign in to comment.