Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReactiveRedisTemplate.listenToLater throws CancellationException Disconnected when used #2489

Closed
cfredri4 opened this issue Jan 16, 2023 · 2 comments
Assignees
Labels
status: invalid An issue that we don't feel is valid

Comments

@cfredri4
Copy link

New to this reactive thing and either I misunderstand something or something is broken with ReactiveRedisTemplate.listenToLater. Reproducing tests below; listenToChannel works fine and prints bar, but listenToChannelLater throws CancellationException Disconnected (full stack trace at end). Tested using 3.0.0.

import org.junit.jupiter.api.Test;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;

public class Test {
    
    @Test
    public void listenToChannel() throws Exception {
        
        var redisConfig = new RedisStandaloneConfiguration("127.0.0.1", 6379);
        var lettuceConfig = LettucePoolingClientConfiguration.defaultConfiguration();
        var connnectionFactory = new LettuceConnectionFactory(redisConfig, lettuceConfig);
        connnectionFactory.afterPropertiesSet();
        
        var template = new ReactiveStringRedisTemplate(connnectionFactory);
        
        var sub = template.listenToChannel("foo")
                .subscribe(System.out::println, Throwable::printStackTrace);
        
        Thread.sleep(1000);
        
        template.convertAndSend("foo", "bar").block();
        
        sub.dispose();
    }
    
    @Test
    public void listenToChannelLater() throws Exception {
        
        var redisConfig = new RedisStandaloneConfiguration("127.0.0.1", 6379);
        var lettuceConfig = LettucePoolingClientConfiguration.defaultConfiguration();
        var connnectionFactory = new LettuceConnectionFactory(redisConfig, lettuceConfig);
        connnectionFactory.afterPropertiesSet();
        
        var template = new ReactiveStringRedisTemplate(connnectionFactory);
        
        var sub = template.listenToChannelLater("foo")
                .flatMapMany(f -> f)
                .subscribe(System.out::println, Throwable::printStackTrace);
        
        Thread.sleep(1000);
        
        template.convertAndSend("foo", "bar").block();
        
        sub.dispose();
    }    
}
java.util.concurrent.CancellationException: Disconnected
	at reactor.core.publisher.FluxPublish$PublishSubscriber.disconnectAction(FluxPublish.java:327)
	at reactor.core.publisher.FluxPublish$PublishSubscriber.dispose(FluxPublish.java:318)
	at org.springframework.data.redis.connection.lettuce.LettuceReactiveSubscription$State.terminate(LettuceReactiveSubscription.java:263)
	at org.springframework.data.redis.connection.lettuce.LettuceReactiveSubscription.lambda$cancel$8(LettuceReactiveSubscription.java:147)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:299)
	at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
	at io.lettuce.core.RedisPublisher$ImmediateSubscriber.onComplete(RedisPublisher.java:896)
	at io.lettuce.core.RedisPublisher$State.onAllDataRead(RedisPublisher.java:698)
	at io.lettuce.core.RedisPublisher$State$3.read(RedisPublisher.java:608)
	at io.lettuce.core.RedisPublisher$State$3.onDataAvailable(RedisPublisher.java:565)
	at io.lettuce.core.RedisPublisher$RedisSubscription.onDataAvailable(RedisPublisher.java:326)
	at io.lettuce.core.RedisPublisher$RedisSubscription.onAllDataRead(RedisPublisher.java:341)
	at io.lettuce.core.RedisPublisher$SubscriptionCommand.doOnComplete(RedisPublisher.java:778)
	at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:65)
	at io.lettuce.core.pubsub.PubSubCommandHandler.completeCommand(PubSubCommandHandler.java:260)
	at io.lettuce.core.pubsub.PubSubCommandHandler.notifyPushListeners(PubSubCommandHandler.java:220)
	at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:646)
	at io.lettuce.core.pubsub.PubSubCommandHandler.decode(PubSubCommandHandler.java:112)
	at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:598)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)
@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Jan 16, 2023
@cfredri4
Copy link
Author

Just saw that the tests for these are disabled due what looks like this very reason.

Interestingly the normal ("non-later") versions are also disabled due to the same reason; is this really correct as I have never encountered this error with them?

@mp911de mp911de self-assigned this Sep 21, 2023
@mp911de mp911de added status: invalid An issue that we don't feel is valid and removed status: waiting-for-triage An issue we've not yet triaged labels Sep 21, 2023
mp911de added a commit that referenced this issue Sep 21, 2023
@mp911de
Copy link
Member

mp911de commented Sep 21, 2023

I was no longer able to reproduce the issue. Feel free to reopen it. Meanwhile, we re-enabled our tests.

@mp911de mp911de closed this as completed Sep 21, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: invalid An issue that we don't feel is valid
Projects
None yet
Development

No branches or pull requests

3 participants