You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I switched to AWS Elasticache Clustered, from AWS Elasticache Serveress, as I kept getting a psubscribe error, and no matter what I did could not remove the error by changing anything in Spring.
So I'm now using AWS Elasticache Clustered. The psubscribe has gone, but after about 15-20 mins I consistently get this error:
Do you know how I can find out what is causing it and resolve it?
Thanks in advance
ERROR
Caused by: org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@43b89ef5 was either previously returned or does not belong to this connection provider
Caused by: org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@4c4f02d6 was either previously returned or does not belong to this connection provider
2024-12-15T15:41:34.277Z ERROR 1 --- [BFFApplication] [ionShutdownHook] reactor.core.publisher.Operators : Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@530629a0 was either previously returned or does not belong to this connection provider
Caused by: org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@530629a0 was either previously returned or does not belong to this connection provider
at org.springframework.data.redis.connection.lettuce.LettucePoolingConnectionProvider.releaseAsync(LettucePoolingConnectionProvider.java:192) ~[spring-data-redis-3.3.3.jar!/:3.3.3]
at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.releaseAsync(LettuceConnectionFactory.java:1834) ~[spring-data-redis-3.3.3.jar!/:3.3.3]
at org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection$AsyncConnect.lambda$close$3(LettuceReactiveRedisConnection.java:373) ~[spring-data-redis-3.3.3.jar!/:3.3.3]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:45) ~[reactor-core-3.6.9.jar!/:3.6.9]
at reactor.core.publisher.Mono.subscribe(Mono.java:4576) ~[reactor-core-3.6.9.jar!/:3.6.9]
...
at org.springframework.boot.SpringApplicationShutdownHook.run(SpringApplicationShutdownHook.java:114) ~[spring-boot-3.3.3.jar!/:3.3.3]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
CONNECTION FACTORY
On AWS RedisConfigType is Clustered, and profileProperties.active = PRODUCTION
/**
* Establishes a Redis Connection Factory with comprehensive configuration options.
*
* Provides configuration for:
* - Connection pooling and lifecycle management
* - Cluster and standalone deployment modes
* - SSL/TLS security for production environments
* - DNS resolution and caching
* - Performance tuning (thread pools, buffers, queues)
* - High availability features (topology refresh, failover)
*
* The factory supports different deployment profiles:
* - Production: Clustered Redis with SSL
* - Development: Standalone Redis without SSL
*
* @property profileProperties Configuration properties for active deployment profile
* @since 1.0.0
*/
@Configuration
@EnableRedisRepositories(
enableKeyspaceEvents = RedisKeyValueAdapter.EnableKeyspaceEvents.OFF,
keyspaceNotificationsConfigParameter = ""
)
internal class RedisConnectionFactoryConfig(
private val profileProperties: ProfileProperties
) {
/**
* Client resources for Redis connections.
*
* Manages shared resources including:
* - Thread pools for I/O and computation
* - DNS resolution and caching
* - Command latency metrics
* - Connection lifecycle
*
* This is initialized in [reactiveRedisConnectionFactory] and cleaned up in [cleanup].
* Using lateinit as the resources are created after Spring context initialization.
*/
private lateinit var clientResources: ClientResources
companion object {
private val logger = LoggerFactory.getLogger(RedisConnectionFactoryConfig::class.java)
/**
* Timeout configurations for Redis operations.
* - Command timeout: Maximum time for command execution
* - Connect timeout: Maximum time for connection establishment
* - Topology refresh: Interval for cluster topology updates
* - Adaptive refresh: Time window for topology change detection
*/
private const val DEFAULT_COMMAND_TIMEOUT_SECONDS = 10L
private const val DEFAULT_CONNECT_TIMEOUT_SECONDS = 10L
private const val TOPOLOGY_REFRESH_PERIOD_SECONDS = 20L
private const val ADAPTIVE_REFRESH_TIMEOUT_SECONDS = 5L
private const val SHUTDOWN_TIMEOUT_SECONDS = 2L
private const val SHUTDOWN_QUIET_PERIOD_TIMEOUT_SECONDS = 1L
/**
* Connection pool settings to optimize resource usage.
* - Max total: Maximum number of connections in the pool
* - Max/Min idle: Upper/Lower bounds for idle connections
* - Max wait: Maximum time to wait for connection
* - Eviction: Maintenance settings for idle connections
*/
private const val MAX_TOTAL_CONNECTIONS = 100
private const val MAX_IDLE_CONNECTIONS = 60
private const val MIN_IDLE_CONNECTIONS = 20
private const val MAX_WAIT_SECONDS = 120L
private const val EVICTION_RUN_PERIOD_SECONDS = 120L
private const val MIN_EVICTABLE_IDLE_MINUTES = 5L
private const val NUM_TESTS_PER_EVICTION_RUN = 3
/**
* Performance optimization parameters.
* - Decode buffer ratio: Memory allocation for response buffers
* - Request queue size: Maximum pending requests
* - Latency publish interval: Metrics publication frequency
* - Thread pool multiplier: Scaling factor for I/O threads
*/
private const val DECODE_BUFFER_POLICY_RATIO = 0.3F
private const val REQUEST_QUEUE_SIZE = 2500
private const val COMMAND_LATENCY_PUBLISH_MINUTES = 1L
private const val IO_THREAD_POOL_MULTIPLIER = 2
}
/**
* Configures Redis keyspace notifications behavior.
*
* Returns NO_OP action to disable automatic configuration of keyspace
* notifications, preventing potential connection issues during shutdown
* related to Pub/Sub connections in Spring Session.
*
* @return ConfigureRedisAction.NO_OP to disable automatic Redis configuration
*/
@Bean
fun configureRedisAction(): ConfigureRedisAction {
return ConfigureRedisAction.NO_OP
}
/* LETTUCE - reactive RedisConnectionFactory */
/**
* Creates the primary reactive Redis connection factory.
*
* Configures a Lettuce-based connection factory with:
* - Profile-specific Redis deployment mode (clustered/standalone)
* - Connection pooling
* - Client resources (thread pools, DNS resolution)
* - SSL for production environments
*
* @param clusterProperties Cluster node configuration
* @param springDataRedisProperties Redis connection properties
* @return Configured [ReactiveRedisConnectionFactory]
*/
@Bean
@Primary
internal fun reactiveRedisConnectionFactory(
clusterProperties: ClusterConfigurationProperties,
springDataRedisProperties: SpringDataRedisProperties,
): ReactiveRedisConnectionFactory {
val config = createRedisConfiguration(springDataRedisProperties, clusterProperties)
clientResources = createClientResources(springDataRedisProperties.host)
val clientConfig = createLettuceClientConfig(
clientResources,
profileProperties.active,
springDataRedisProperties.type
)
return LettuceConnectionFactory(config, clientConfig).apply {
afterPropertiesSet()
validateConnection = false
setShareNativeConnection(true)
}
}
/**
* Properties class for Redis cluster configuration.
*
* @property nodes List of Redis nodes in format `host:port`
*/
@Component
internal class ClusterConfigurationProperties(
springDataRedisProperties: SpringDataRedisProperties
) {
/**
* Get initial collection of known cluster nodes in format `host:port`.
* @return
*/
var nodes = listOf(
"${springDataRedisProperties.host}:${springDataRedisProperties.port}",
)
}
/**
* Creates appropriate Redis configuration based on active profile.
*
* @param properties Redis connection properties
* @param clusterProperties Cluster node configuration
* @return [RedisConfiguration] for either clustered or standalone deployment
*/
private fun createRedisConfiguration(
properties: SpringDataRedisProperties,
clusterProperties: ClusterConfigurationProperties
): RedisConfiguration = when {
profileProperties.active == ProfileTypes.PRODUCTION.type &&
properties.type == RedisConfigTypes.CLUSTERED.type -> {
// Redis Cluster for production
RedisClusterConfiguration(clusterProperties.nodes).apply {
password = RedisPassword.of(properties.password)
}
}
else -> {
// Redis Standalone for non-production
RedisStandaloneConfiguration().apply {
hostName = properties.host
port = properties.port
password = RedisPassword.of(properties.password)
}
}
}
/**
* Creates client resources with optimized thread pools and DNS resolution.
*
* @param host Redis host for DNS resolution
* @return Configured [ClientResources]
*/
private fun createClientResources(host: String) = DefaultClientResources.builder()
.ioThreadPoolSize(Runtime.getRuntime().availableProcessors() * IO_THREAD_POOL_MULTIPLIER)
.computationThreadPoolSize(Runtime.getRuntime().availableProcessors())
.socketAddressResolver(createCachingDnsResolver(host))
.commandLatencyRecorder(DefaultCommandLatencyCollector.disabled())
.commandLatencyPublisherOptions { Duration.ofMinutes(COMMAND_LATENCY_PUBLISH_MINUTES) }
.build()
/**
* Creates Lettuce client configuration with pooling and security settings.
*
* Configures:
* - Read preferences (replica preferred)
* - Command timeouts
* - Connection pooling
* - SSL (for production)
*
* @param clientResources Configured client resources for connection management
* @param activeProfile Current deployment profile
* @return Configured [LettucePoolingClientConfiguration]
*/
private fun createLettuceClientConfig(
clientResources: ClientResources,
activeProfile: String?,
redisConfigType: String?
): LettucePoolingClientConfiguration {
val clusterClientOptions = createClusterClientOptions(activeProfile, redisConfigType)
return LettucePoolingClientConfiguration.builder()
.readFrom(REPLICA_PREFERRED)
.commandTimeout(Duration.ofSeconds(DEFAULT_COMMAND_TIMEOUT_SECONDS))
.clientResources(clientResources)
.clientOptions(clusterClientOptions)
.poolConfig(buildLettucePoolConfig())
.shutdownTimeout(Duration.ofSeconds(SHUTDOWN_TIMEOUT_SECONDS))
.shutdownQuietPeriod(Duration.ofSeconds(SHUTDOWN_QUIET_PERIOD_TIMEOUT_SECONDS))
// conditionally use sslOptions if profileProperties.active is 'prod'
.apply {
if (activeProfile == ProfileTypes.PRODUCTION.type) {
useSsl()
}
}
.build()
}
/**
* Creates cluster client options with comprehensive connection settings.
*
* Configures:
* - Auto-reconnect behavior
* - Connection validation
* - Timeout settings
* - Socket options
* - Topology refresh
* - Buffer and queue sizes
* - SSL (for production)
*
* @param activeProfile Current deployment profile
* @return Configured [ClusterClientOptions]
*/
private fun createClusterClientOptions(activeProfile: String?, redisConfigType: String?): ClusterClientOptions {
val builder = ClusterClientOptions.builder()
.autoReconnect(true)
.pingBeforeActivateConnection(true)
.timeoutOptions(createTimeoutOptions())
.socketOptions(createSocketOptions())
.topologyRefreshOptions(createTopologyRefreshOptions())
.validateClusterNodeMembership(true)
.suspendReconnectOnProtocolFailure(true)
.disconnectedBehavior(DEFAULT_DISCONNECTED_BEHAVIOR)
.decodeBufferPolicy(DecodeBufferPolicies.ratio(DECODE_BUFFER_POLICY_RATIO))
.requestQueueSize(REQUEST_QUEUE_SIZE)
.maxRedirects(DEFAULT_MAX_REDIRECTS)
.suspendReconnectOnProtocolFailure(DEFAULT_SUSPEND_RECONNECT_PROTO_FAIL)
.publishOnScheduler(true)
.protocolVersion(ProtocolVersion.RESP3)
// conditionally use sslOptions if profileProperties.active is 'prod'
if (activeProfile == ProfileTypes.PRODUCTION.type) {
builder.sslOptions(createSslOptions())
}
// conditionally use connected nodes if redisConfigType is 'clustered'
if (redisConfigType == RedisConfigTypes.CLUSTERED.type) {
builder.nodeFilter { node ->
node.isConnected
}
}
return builder.build()
}
/**
* Creates socket options for Redis connections.
*
* Configures:
* - Keep-alive settings
* - TCP no-delay
* - Connection timeouts
*
* @return Configured [SocketOptions]
*/
private fun createSocketOptions() = SocketOptions.builder()
.keepAlive(SocketOptions.DEFAULT_SO_KEEPALIVE)
.tcpNoDelay(SocketOptions.DEFAULT_SO_NO_DELAY)
.connectTimeout(Duration.ofSeconds(DEFAULT_CONNECT_TIMEOUT_SECONDS))
.build()
/**
* Creates timeout options for Redis commands.
*
* Configures:
* - Fixed timeout duration
* - Command timeout behavior
*
* @return Configured [TimeoutOptions]
*/
private fun createTimeoutOptions() = TimeoutOptions.builder()
.fixedTimeout(Duration.ofSeconds(DEFAULT_COMMAND_TIMEOUT_SECONDS))
.timeoutCommands(true)
.build()
/**
* Creates topology refresh options for Redis cluster.
*
* Configures:
* - Periodic refresh intervals
* - Dynamic refresh sources
* - Stale connection handling
* - Adaptive refresh triggers
*
* @return Configured [ClusterTopologyRefreshOptions]
*/
private fun createTopologyRefreshOptions() = ClusterTopologyRefreshOptions.builder()
.enablePeriodicRefresh(Duration.ofSeconds(TOPOLOGY_REFRESH_PERIOD_SECONDS))
.dynamicRefreshSources(true)
.closeStaleConnections(true)
.adaptiveRefreshTriggersTimeout(Duration.ofSeconds(ADAPTIVE_REFRESH_TIMEOUT_SECONDS))
.enableAllAdaptiveRefreshTriggers()
.build()
/**
* Creates connection pool configuration.
*
* Configures:
* - Maximum total/idle connections
* - Connection wait times
* - Eviction policies
* - Connection testing
* - Pool behavior (LIFO/FIFO)
*
* @return Configured [GenericObjectPoolConfig]
*/
private fun buildLettucePoolConfig() = GenericObjectPoolConfig<Any>().apply {
maxTotal = MAX_TOTAL_CONNECTIONS
maxIdle = MAX_IDLE_CONNECTIONS
minIdle = MIN_IDLE_CONNECTIONS
setMaxWait(Duration.ofSeconds(MAX_WAIT_SECONDS))
timeBetweenEvictionRuns = Duration.ofSeconds(EVICTION_RUN_PERIOD_SECONDS)
minEvictableIdleTime = Duration.ofMinutes(MIN_EVICTABLE_IDLE_MINUTES)
testOnBorrow = true
testWhileIdle = true
testOnReturn = true
blockWhenExhausted = true
lifo = true
jmxEnabled = false
fairness = true
evictionPolicyClassName = "org.apache.commons.pool2.impl.DefaultEvictionPolicy"
numTestsPerEvictionRun = NUM_TESTS_PER_EVICTION_RUN
}
/**
* Creates SSL options for secure Redis connections.
*
* Configures JDK-based SSL provider for Redis connections
* in production environments.
*
* @return Configured [SslOptions]
*/
private fun createSslOptions(): SslOptions = SslOptions.builder()
.jdkSslProvider()
.build()
/**
* Creates DNS resolver with caching capabilities.
*
* Implements:
* - DNS resolution caching
* - Hostname-to-IP mapping
* - Fallback handling for resolution failures
*
* @param host Redis host to resolve
* @return Configured [MappingSocketAddressResolver]
*/
private fun createCachingDnsResolver(host: String): MappingSocketAddressResolver {
val dnsCache = ConcurrentHashMap<String, Array<InetAddress>>()
val mappingFunction: (HostAndPort) -> HostAndPort = { hostAndPort ->
val addresses = dnsCache.computeIfAbsent(host) {
try {
DnsResolvers.JVM_DEFAULT.resolve(host)
} catch (e: UnknownHostException) {
emptyArray()
}
}
val cacheIP = addresses.firstOrNull()?.hostAddress
if (hostAndPort.hostText == cacheIP) {
HostAndPort.of(host, hostAndPort.port)
} else {
hostAndPort
}
}
return MappingSocketAddressResolver.create(DnsResolvers.JVM_DEFAULT, mappingFunction)
}
}
The text was updated successfully, but these errors were encountered:
Apparently, the Spring Redis Data team does recognise this as an issue, but says it is something the Spring Session team should look into.
See here, for a full explanation, with their responses:
spring-projects/spring-data-redis#3075
Can a solution be found?
Original issue below:
I switched to AWS Elasticache Clustered, from AWS Elasticache Serveress, as I kept getting a psubscribe error, and no matter what I did could not remove the error by changing anything in Spring.
So I'm now using AWS Elasticache Clustered. The psubscribe has gone, but after about 15-20 mins I consistently get this error:
Do you know how I can find out what is causing it and resolve it?
Thanks in advance
ERROR
CONNECTION FACTORY
On AWS RedisConfigType is Clustered, and profileProperties.active = PRODUCTION
The text was updated successfully, but these errors were encountered: