Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS Clustered issue - how can I resolve in Spring - redis.connection.PoolException error #3076

Closed
dreamstar-enterprises opened this issue Dec 15, 2024 · 3 comments
Labels
for: external-project For an external project and not something we can fix

Comments

@dreamstar-enterprises
Copy link

I switched to AWS Elasticache Clustered, from AWS Elasticache Serveress, as I kept getting a psubscribe error, and no matter what I did could not remove the error by changing anything in Spring.

So I'm now using AWS Elasticache Clustered. The psubscribe has gone, but after about 15-20 mins I consistently get this error:

Do you know how I can find out what is causing it and resolve it?

Thanks in advance

ERROR

Caused by: org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@43b89ef5 was either previously returned or does not belong to this connection provider

Caused by: org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@4c4f02d6 was either previously returned or does not belong to this connection provider

2024-12-15T15:41:34.277Z ERROR 1 --- [BFFApplication] [ionShutdownHook] reactor.core.publisher.Operators : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@530629a0 was either previously returned or does not belong to this connection provider

Caused by: org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.cluster.StatefulRedisClusterPubSubConnectionImpl@530629a0 was either previously returned or does not belong to this connection provider

at org.springframework.data.redis.connection.lettuce.LettucePoolingConnectionProvider.releaseAsync(LettucePoolingConnectionProvider.java:192) ~[spring-data-redis-3.3.3.jar!/:3.3.3]

at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.releaseAsync(LettuceConnectionFactory.java:1834) ~[spring-data-redis-3.3.3.jar!/:3.3.3]

at org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection$AsyncConnect.lambda$close$3(LettuceReactiveRedisConnection.java:373) ~[spring-data-redis-3.3.3.jar!/:3.3.3]

at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:45) ~[reactor-core-3.6.9.jar!/:3.6.9]

at reactor.core.publisher.Mono.subscribe(Mono.java:4576) ~[reactor-core-3.6.9.jar!/:3.6.9]
...

at org.springframework.boot.SpringApplicationShutdownHook.run(SpringApplicationShutdownHook.java:114) ~[spring-boot-3.3.3.jar!/:3.3.3]

at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

CONNECTION FACTORY

On AWS RedisConfigType is Clustered, and profileProperties.active = PRODUCTION

/**
 * Establishes a Redis Connection Factory with comprehensive configuration options.
 *
 * Provides configuration for:
 * - Connection pooling and lifecycle management
 * - Cluster and standalone deployment modes
 * - SSL/TLS security for production environments
 * - DNS resolution and caching
 * - Performance tuning (thread pools, buffers, queues)
 * - High availability features (topology refresh, failover)
 *
 * The factory supports different deployment profiles:
 * - Production: Clustered Redis with SSL
 * - Development: Standalone Redis without SSL
 *
 * @property profileProperties Configuration properties for active deployment profile
 * @since 1.0.0
 */
@Configuration
@EnableRedisRepositories(
    enableKeyspaceEvents = RedisKeyValueAdapter.EnableKeyspaceEvents.OFF,
    keyspaceNotificationsConfigParameter = ""
)
internal class RedisConnectionFactoryConfig(
    private val profileProperties: ProfileProperties
) {

    /**
     * Client resources for Redis connections.
     *
     * Manages shared resources including:
     * - Thread pools for I/O and computation
     * - DNS resolution and caching
     * - Command latency metrics
     * - Connection lifecycle
     *
     * This is initialized in [reactiveRedisConnectionFactory] and cleaned up in [cleanup].
     * Using lateinit as the resources are created after Spring context initialization.
     */
    private lateinit var clientResources: ClientResources

    companion object {

        private val logger = LoggerFactory.getLogger(RedisConnectionFactoryConfig::class.java)

        /**
         * Timeout configurations for Redis operations.
         * - Command timeout: Maximum time for command execution
         * - Connect timeout: Maximum time for connection establishment
         * - Topology refresh: Interval for cluster topology updates
         * - Adaptive refresh: Time window for topology change detection
         */
        private const val DEFAULT_COMMAND_TIMEOUT_SECONDS = 10L
        private const val DEFAULT_CONNECT_TIMEOUT_SECONDS = 10L
        private const val TOPOLOGY_REFRESH_PERIOD_SECONDS = 20L
        private const val ADAPTIVE_REFRESH_TIMEOUT_SECONDS = 5L
        private const val SHUTDOWN_TIMEOUT_SECONDS = 2L
        private const val SHUTDOWN_QUIET_PERIOD_TIMEOUT_SECONDS = 1L

        /**
         * Connection pool settings to optimize resource usage.
         * - Max total: Maximum number of connections in the pool
         * - Max/Min idle: Upper/Lower bounds for idle connections
         * - Max wait: Maximum time to wait for connection
         * - Eviction: Maintenance settings for idle connections
         */
        private const val MAX_TOTAL_CONNECTIONS = 100
        private const val MAX_IDLE_CONNECTIONS = 60
        private const val MIN_IDLE_CONNECTIONS = 20
        private const val MAX_WAIT_SECONDS = 120L
        private const val EVICTION_RUN_PERIOD_SECONDS = 120L
        private const val MIN_EVICTABLE_IDLE_MINUTES = 5L
        private const val NUM_TESTS_PER_EVICTION_RUN = 3

        /**
         * Performance optimization parameters.
         * - Decode buffer ratio: Memory allocation for response buffers
         * - Request queue size: Maximum pending requests
         * - Latency publish interval: Metrics publication frequency
         * - Thread pool multiplier: Scaling factor for I/O threads
         */
        private const val DECODE_BUFFER_POLICY_RATIO = 0.3F
        private const val REQUEST_QUEUE_SIZE = 2500
        private const val COMMAND_LATENCY_PUBLISH_MINUTES = 1L
        private const val IO_THREAD_POOL_MULTIPLIER = 2
    }

    /**
     * Configures Redis keyspace notifications behavior.
     *
     * Returns NO_OP action to disable automatic configuration of keyspace
     * notifications, preventing potential connection issues during shutdown
     * related to Pub/Sub connections in Spring Session.
     *
     * @return ConfigureRedisAction.NO_OP to disable automatic Redis configuration
     */
    @Bean
    fun configureRedisAction(): ConfigureRedisAction {
        return ConfigureRedisAction.NO_OP
    }

    /* LETTUCE - reactive RedisConnectionFactory */
    /**
     * Creates the primary reactive Redis connection factory.
     *
     * Configures a Lettuce-based connection factory with:
     * - Profile-specific Redis deployment mode (clustered/standalone)
     * - Connection pooling
     * - Client resources (thread pools, DNS resolution)
     * - SSL for production environments
     *
     * @param clusterProperties Cluster node configuration
     * @param springDataRedisProperties Redis connection properties
     * @return Configured [ReactiveRedisConnectionFactory]
     */
    @Bean
    @Primary
    internal fun reactiveRedisConnectionFactory(
        clusterProperties: ClusterConfigurationProperties,
        springDataRedisProperties: SpringDataRedisProperties,
    ): ReactiveRedisConnectionFactory {
        val config = createRedisConfiguration(springDataRedisProperties, clusterProperties)
        clientResources = createClientResources(springDataRedisProperties.host)
        val clientConfig = createLettuceClientConfig(
            clientResources,
            profileProperties.active,
            springDataRedisProperties.type
        )

        return LettuceConnectionFactory(config, clientConfig).apply {
            afterPropertiesSet()
            validateConnection = false
            setShareNativeConnection(true)
        }
    }

    /**
     * Properties class for Redis cluster configuration.
     *
     * @property nodes List of Redis nodes in format `host:port`
     */
    @Component
    internal class ClusterConfigurationProperties(
        springDataRedisProperties: SpringDataRedisProperties
    ) {
        /**
         * Get initial collection of known cluster nodes in format `host:port`.
         * @return
         */
        var nodes = listOf(
            "${springDataRedisProperties.host}:${springDataRedisProperties.port}",
        )
    }

    /**
     * Creates appropriate Redis configuration based on active profile.
     *
     * @param properties Redis connection properties
     * @param clusterProperties Cluster node configuration
     * @return [RedisConfiguration] for either clustered or standalone deployment
     */
    private fun createRedisConfiguration(
        properties: SpringDataRedisProperties,
        clusterProperties: ClusterConfigurationProperties
    ): RedisConfiguration = when {
        profileProperties.active == ProfileTypes.PRODUCTION.type &&
                properties.type == RedisConfigTypes.CLUSTERED.type -> {

            // Redis Cluster for production
            RedisClusterConfiguration(clusterProperties.nodes).apply {
                password = RedisPassword.of(properties.password)
            }
        }
        else -> {

            // Redis Standalone for non-production
            RedisStandaloneConfiguration().apply {
                hostName = properties.host
                port = properties.port
                password = RedisPassword.of(properties.password)
            }
        }
    }

    /**
     * Creates client resources with optimized thread pools and DNS resolution.
     *
     * @param host Redis host for DNS resolution
     * @return Configured [ClientResources]
     */
    private fun createClientResources(host: String) = DefaultClientResources.builder()
        .ioThreadPoolSize(Runtime.getRuntime().availableProcessors() * IO_THREAD_POOL_MULTIPLIER)
        .computationThreadPoolSize(Runtime.getRuntime().availableProcessors())
        .socketAddressResolver(createCachingDnsResolver(host))
        .commandLatencyRecorder(DefaultCommandLatencyCollector.disabled())
        .commandLatencyPublisherOptions { Duration.ofMinutes(COMMAND_LATENCY_PUBLISH_MINUTES) }
        .build()

    /**
     * Creates Lettuce client configuration with pooling and security settings.
     *
     * Configures:
     * - Read preferences (replica preferred)
     * - Command timeouts
     * - Connection pooling
     * - SSL (for production)
     *
     * @param clientResources Configured client resources for connection management
     * @param activeProfile Current deployment profile
     * @return Configured [LettucePoolingClientConfiguration]
     */
    private fun createLettuceClientConfig(
        clientResources: ClientResources,
        activeProfile: String?,
        redisConfigType: String?
    ): LettucePoolingClientConfiguration {
        val clusterClientOptions = createClusterClientOptions(activeProfile, redisConfigType)

        return LettucePoolingClientConfiguration.builder()
            .readFrom(REPLICA_PREFERRED)
            .commandTimeout(Duration.ofSeconds(DEFAULT_COMMAND_TIMEOUT_SECONDS))
            .clientResources(clientResources)
            .clientOptions(clusterClientOptions)
            .poolConfig(buildLettucePoolConfig())
            .shutdownTimeout(Duration.ofSeconds(SHUTDOWN_TIMEOUT_SECONDS))
            .shutdownQuietPeriod(Duration.ofSeconds(SHUTDOWN_QUIET_PERIOD_TIMEOUT_SECONDS))
            // conditionally use sslOptions if profileProperties.active is 'prod'
            .apply {
                if (activeProfile == ProfileTypes.PRODUCTION.type) {
                    useSsl()
                }
            }
            .build()
    }

    /**
     * Creates cluster client options with comprehensive connection settings.
     *
     * Configures:
     * - Auto-reconnect behavior
     * - Connection validation
     * - Timeout settings
     * - Socket options
     * - Topology refresh
     * - Buffer and queue sizes
     * - SSL (for production)
     *
     * @param activeProfile Current deployment profile
     * @return Configured [ClusterClientOptions]
     */
    private fun createClusterClientOptions(activeProfile: String?, redisConfigType: String?): ClusterClientOptions {
        val builder = ClusterClientOptions.builder()
            .autoReconnect(true)
            .pingBeforeActivateConnection(true)
            .timeoutOptions(createTimeoutOptions())
            .socketOptions(createSocketOptions())
            .topologyRefreshOptions(createTopologyRefreshOptions())
            .validateClusterNodeMembership(true)
            .suspendReconnectOnProtocolFailure(true)
            .disconnectedBehavior(DEFAULT_DISCONNECTED_BEHAVIOR)
            .decodeBufferPolicy(DecodeBufferPolicies.ratio(DECODE_BUFFER_POLICY_RATIO))
            .requestQueueSize(REQUEST_QUEUE_SIZE)
            .maxRedirects(DEFAULT_MAX_REDIRECTS)
            .suspendReconnectOnProtocolFailure(DEFAULT_SUSPEND_RECONNECT_PROTO_FAIL)
            .publishOnScheduler(true)
            .protocolVersion(ProtocolVersion.RESP3)

        // conditionally use sslOptions if profileProperties.active is 'prod'
        if (activeProfile == ProfileTypes.PRODUCTION.type) {
            builder.sslOptions(createSslOptions())
        }

        // conditionally use connected nodes if redisConfigType is 'clustered'
        if (redisConfigType == RedisConfigTypes.CLUSTERED.type) {
            builder.nodeFilter { node ->
                node.isConnected
            }
        }

        return builder.build()
    }

    /**
     * Creates socket options for Redis connections.
     *
     * Configures:
     * - Keep-alive settings
     * - TCP no-delay
     * - Connection timeouts
     *
     * @return Configured [SocketOptions]
     */
    private fun createSocketOptions() = SocketOptions.builder()
        .keepAlive(SocketOptions.DEFAULT_SO_KEEPALIVE)
        .tcpNoDelay(SocketOptions.DEFAULT_SO_NO_DELAY)
        .connectTimeout(Duration.ofSeconds(DEFAULT_CONNECT_TIMEOUT_SECONDS))
        .build()

    /**
     * Creates timeout options for Redis commands.
     *
     * Configures:
     * - Fixed timeout duration
     * - Command timeout behavior
     *
     * @return Configured [TimeoutOptions]
     */
    private fun createTimeoutOptions() = TimeoutOptions.builder()
        .fixedTimeout(Duration.ofSeconds(DEFAULT_COMMAND_TIMEOUT_SECONDS))
        .timeoutCommands(true)
        .build()

    /**
     * Creates topology refresh options for Redis cluster.
     *
     * Configures:
     * - Periodic refresh intervals
     * - Dynamic refresh sources
     * - Stale connection handling
     * - Adaptive refresh triggers
     *
     * @return Configured [ClusterTopologyRefreshOptions]
     */
    private fun createTopologyRefreshOptions() = ClusterTopologyRefreshOptions.builder()
        .enablePeriodicRefresh(Duration.ofSeconds(TOPOLOGY_REFRESH_PERIOD_SECONDS))
        .dynamicRefreshSources(true)
        .closeStaleConnections(true)
        .adaptiveRefreshTriggersTimeout(Duration.ofSeconds(ADAPTIVE_REFRESH_TIMEOUT_SECONDS))
        .enableAllAdaptiveRefreshTriggers()
        .build()

    /**
     * Creates connection pool configuration.
     *
     * Configures:
     * - Maximum total/idle connections
     * - Connection wait times
     * - Eviction policies
     * - Connection testing
     * - Pool behavior (LIFO/FIFO)
     *
     * @return Configured [GenericObjectPoolConfig]
     */
    private fun buildLettucePoolConfig() = GenericObjectPoolConfig<Any>().apply {
        maxTotal = MAX_TOTAL_CONNECTIONS
        maxIdle = MAX_IDLE_CONNECTIONS
        minIdle = MIN_IDLE_CONNECTIONS
        setMaxWait(Duration.ofSeconds(MAX_WAIT_SECONDS))
        timeBetweenEvictionRuns = Duration.ofSeconds(EVICTION_RUN_PERIOD_SECONDS)
        minEvictableIdleTime = Duration.ofMinutes(MIN_EVICTABLE_IDLE_MINUTES)
        testOnBorrow = true
        testWhileIdle = true
        testOnReturn = true
        blockWhenExhausted = true
        lifo = true
        jmxEnabled = false
        fairness = true
        evictionPolicyClassName = "org.apache.commons.pool2.impl.DefaultEvictionPolicy"
        numTestsPerEvictionRun = NUM_TESTS_PER_EVICTION_RUN
    }

    /**
     * Creates SSL options for secure Redis connections.
     *
     * Configures JDK-based SSL provider for Redis connections
     * in production environments.
     *
     * @return Configured [SslOptions]
     */
    private fun createSslOptions(): SslOptions = SslOptions.builder()
        .jdkSslProvider()
        .build()

    /**
     * Creates DNS resolver with caching capabilities.
     *
     * Implements:
     * - DNS resolution caching
     * - Hostname-to-IP mapping
     * - Fallback handling for resolution failures
     *
     * @param host Redis host to resolve
     * @return Configured [MappingSocketAddressResolver]
     */
    private fun createCachingDnsResolver(host: String): MappingSocketAddressResolver {
        val dnsCache = ConcurrentHashMap<String, Array<InetAddress>>()

        val mappingFunction: (HostAndPort) -> HostAndPort = { hostAndPort ->
            val addresses = dnsCache.computeIfAbsent(host) {
                try {
                    DnsResolvers.JVM_DEFAULT.resolve(host)
                } catch (e: UnknownHostException) {
                    emptyArray()
                }
            }

            val cacheIP = addresses.firstOrNull()?.hostAddress
            if (hostAndPort.hostText == cacheIP) {
                HostAndPort.of(host, hostAndPort.port)
            } else {
                hostAndPort
            }
        }

        return MappingSocketAddressResolver.create(DnsResolvers.JVM_DEFAULT, mappingFunction)
    }

}
@dreamstar-enterprises dreamstar-enterprises changed the title AWS Clustered issue - psubscribe error - how can I disable in Spring - redis.connection.PoolException AWS Clustered issue - how can I resolve in Spring - redis.connection.PoolException error Dec 15, 2024
@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Dec 15, 2024
@dreamstar-enterprises
Copy link
Author

dreamstar-enterprises commented Dec 15, 2024

The source of the error message (not the actual error) is from this class, class LettucePoolingConnectionProvider, v 2.0:
It happens with AWS Elasticache Redis / Valkey Clustered, and AWS Elasticache Redis / Valkey Serverless

@Override
	public void release(StatefulConnection<?, ?> connection) {

		GenericObjectPool<StatefulConnection<?, ?>> pool = poolRef.remove(connection);

		if (pool == null) {

			AsyncPool<StatefulConnection<?, ?>> asyncPool = asyncPoolRef.remove(connection);

			if (asyncPool == null) {
				throw new PoolException("Returned connection " + connection
						+ " was either previously returned or does not belong to this connection provider");
			}

			discardIfNecessary(connection);
			asyncPool.release(connection).join();
			return;
		}

		discardIfNecessary(connection);
		pool.returnObject(connection);
	}

@mp911de
Copy link
Member

mp911de commented Dec 16, 2024

It looks like the Redis connection has been returned to the pool after pool disposal. Since this is in the context of Spring Session, I suggest filing a ticket there.

at org.springframework.boot.SpringApplicationShutdownHook.run(SpringApplicationShutdownHook.java:114) ~[spring-boot-3.3.3.jar!/:3.3.3]

@mp911de mp911de closed this as not planned Won't fix, can't repro, duplicate, stale Dec 16, 2024
@mp911de mp911de added for: external-project For an external project and not something we can fix and removed status: waiting-for-triage An issue we've not yet triaged labels Dec 16, 2024
@dreamstar-enterprises
Copy link
Author

Apparently doing this helped, I have no idea why, perhaps a glitch:

 /**
    * Creates connection pool configuration.
    *
    * Configures:
    * - Maximum total/idle connections
    * - Connection wait times
    * - Eviction policies
    * - Connection testing
    * - Pool behavior (LIFO/FIFO)
    *
    * @return Configured [GenericObjectPoolConfig]
    */
   private fun buildLettucePoolConfig() = GenericObjectPoolConfig<Any>().apply {
       maxTotal = MAX_TOTAL_CONNECTIONS
       maxIdle = MAX_IDLE_CONNECTIONS
       minIdle = MIN_IDLE_CONNECTIONS
       setMaxWait(Duration.ofSeconds(MAX_WAIT_SECONDS))
       timeBetweenEvictionRuns = Duration.ofSeconds(EVICTION_RUN_PERIOD_SECONDS)
       minEvictableIdleTime = Duration.ofMinutes(MIN_EVICTABLE_IDLE_MINUTES)
       testOnBorrow = true
       testWhileIdle = true
       testOnReturn = false // <<-- ENSURE THIS IS KEPT AS FALSE ON A.W.S. ELASTICACHE CLUSTERED or SERVERLESS
       blockWhenExhausted = true
       lifo = true
       jmxEnabled = false
       fairness = true
       evictionPolicyClassName = "org.apache.commons.pool2.impl.DefaultEvictionPolicy"
       numTestsPerEvictionRun = NUM_TESTS_PER_EVICTION_RUN
   }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for: external-project For an external project and not something we can fix
Projects
None yet
Development

No branches or pull requests

3 participants