From 1deb7bd2003c6a8fb5973dd2ff1ac206e2f870fb Mon Sep 17 00:00:00 2001 From: Tihomir Krasimirov Mateev Date: Tue, 5 Nov 2024 14:40:03 +0200 Subject: [PATCH] WATCH is now working in the same time as MULTI when called inside a MULTI (#3027) --- .../core/FutureSyncInvocationHandler.java | 3 +- .../core/StatefulRedisConnectionImpl.java | 54 +++++++++---------- .../TransactionCommandIntegrationTests.java | 24 +++++++++ .../test/ReactiveSyncInvocationHandler.java | 3 +- 4 files changed, 54 insertions(+), 30 deletions(-) diff --git a/src/main/java/io/lettuce/core/FutureSyncInvocationHandler.java b/src/main/java/io/lettuce/core/FutureSyncInvocationHandler.java index c251a35310..b5fd5bd6da 100644 --- a/src/main/java/io/lettuce/core/FutureSyncInvocationHandler.java +++ b/src/main/java/io/lettuce/core/FutureSyncInvocationHandler.java @@ -100,7 +100,8 @@ private static boolean isTransactionActive(StatefulConnection connection) private static boolean isTxControlMethod(String methodName, Object[] args) { - if (methodName.equals("exec") || methodName.equals("multi") || methodName.equals("discard")) { + if (methodName.equals("exec") || methodName.equals("multi") || methodName.equals("discard") + || methodName.equals("watch")) { return true; } diff --git a/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java b/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java index aef29c99bb..a26c80a856 100644 --- a/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java +++ b/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java @@ -166,9 +166,6 @@ public boolean isMulti() { public RedisCommand dispatch(RedisCommand command) { RedisCommand toSend = preProcessCommand(command); - - potentiallyEnableMulti(command); - return super.dispatch(toSend); } @@ -179,35 +176,21 @@ public RedisCommand dispatch(RedisCommand command) { commands.forEach(o -> { RedisCommand command = preProcessCommand(o); - sentCommands.add(command); - potentiallyEnableMulti(command); }); return super.dispatch(sentCommands); } - private void potentiallyEnableMulti(RedisCommand command) { - - if (command.getType().toString().equals(MULTI.name())) { - - multi = (multi == null ? new MultiOutput<>(codec) : multi); - - if (command instanceof CompleteableCommand) { - ((CompleteableCommand) command).onComplete((ignored, e) -> { - if (e != null) { - multi = null; - } - }); - } - } - } - + // TODO [tihomir.mateev] Refactor to include as part of the Command interface + // All these if statements clearly indicate this is a problem best solve by each command + // (defining a pre and post processing behaviour of the command) protected RedisCommand preProcessCommand(RedisCommand command) { RedisCommand local = command; + String commandType = command.getType().toString(); - if (local.getType().toString().equals(AUTH.name())) { + if (commandType.equals(AUTH.name())) { local = attachOnComplete(local, status -> { if ("OK".equals(status)) { @@ -224,7 +207,7 @@ protected RedisCommand preProcessCommand(RedisCommand comm }); } - if (local.getType().toString().equals(SELECT.name())) { + if (commandType.equals(SELECT.name())) { local = attachOnComplete(local, status -> { if ("OK".equals(status)) { Long db = CommandArgsAccessor.getFirstInteger(command.getArgs()); @@ -235,7 +218,7 @@ protected RedisCommand preProcessCommand(RedisCommand comm }); } - if (local.getType().toString().equals(READONLY.name())) { + if (commandType.equals(READONLY.name())) { local = attachOnComplete(local, status -> { if ("OK".equals(status)) { state.setReadOnly(true); @@ -243,7 +226,7 @@ protected RedisCommand preProcessCommand(RedisCommand comm }); } - if (local.getType().toString().equals(READWRITE.name())) { + if (commandType.equals(READWRITE.name())) { local = attachOnComplete(local, status -> { if ("OK".equals(status)) { state.setReadOnly(false); @@ -251,14 +234,14 @@ protected RedisCommand preProcessCommand(RedisCommand comm }); } - if (local.getType().toString().equals(DISCARD.name())) { + if (commandType.equals(DISCARD.name())) { if (multi != null) { multi.cancel(); multi = null; } } - if (local.getType().toString().equals(EXEC.name())) { + if (commandType.equals(EXEC.name())) { MultiOutput multiOutput = this.multi; this.multi = null; if (multiOutput == null) { @@ -267,10 +250,25 @@ protected RedisCommand preProcessCommand(RedisCommand comm local.setOutput((MultiOutput) multiOutput); } - if (multi != null && !local.getType().toString().equals(MULTI.name())) { + if (multi != null && !commandType.equals(MULTI.name()) && !commandType.equals(WATCH.name())) { + // ignore MULTI and WATCH commands nested in another MULTI local = new TransactionalCommand<>(local); multi.add(local); } + + if (commandType.equals(MULTI.name())) { + + multi = (multi == null ? new MultiOutput<>(codec) : multi); + + if (command instanceof CompleteableCommand) { + ((CompleteableCommand) command).onComplete((ignored, e) -> { + if (e != null) { + multi = null; + } + }); + } + } + return local; } diff --git a/src/test/java/io/lettuce/core/commands/TransactionCommandIntegrationTests.java b/src/test/java/io/lettuce/core/commands/TransactionCommandIntegrationTests.java index c0f4b5f92d..800553c8fb 100644 --- a/src/test/java/io/lettuce/core/commands/TransactionCommandIntegrationTests.java +++ b/src/test/java/io/lettuce/core/commands/TransactionCommandIntegrationTests.java @@ -142,6 +142,30 @@ void errorInMulti() { assertThat((String) values.get(2)).isEqualTo(value); } + @Test + void errorWhileWatchInsideMulti() { + assertThat(redis.multi()).isEqualTo("OK"); + assertThat(redis.set(key, value)).isEqualTo(null); + assertThatThrownBy(() -> redis.watch(key)).isInstanceOf(RedisCommandExecutionException.class) + .hasMessageContaining("ERR WATCH inside MULTI is not allowed"); + assertThat(redis.get(key)).isEqualTo(null); + TransactionResult values = redis.exec(); + assertThat(values.wasDiscarded()).isFalse(); + assertThat((String) values.get(0)).isEqualTo("OK"); + assertThat((String) values.get(1)).isEqualTo(value); + } + + @Test + void errorWhileMultiInsideMulti() { + assertThat(redis.multi()).isEqualTo("OK"); + assertThat(redis.set(key, value)).isEqualTo(null); + assertThatThrownBy(redis::multi).isInstanceOf(RedisCommandExecutionException.class) + .hasMessageContaining("ERR MULTI calls can not be nested"); + assertThat(redis.get(key)).isEqualTo(null); + TransactionResult values = redis.exec(); + assertThat(values.wasDiscarded()).isFalse(); + } + @Test void execWithoutMulti() { assertThatThrownBy(redis::exec).isInstanceOf(RedisCommandExecutionException.class) diff --git a/src/test/java/io/lettuce/test/ReactiveSyncInvocationHandler.java b/src/test/java/io/lettuce/test/ReactiveSyncInvocationHandler.java index 758327e356..35b9b7be23 100644 --- a/src/test/java/io/lettuce/test/ReactiveSyncInvocationHandler.java +++ b/src/test/java/io/lettuce/test/ReactiveSyncInvocationHandler.java @@ -75,7 +75,8 @@ protected Object handleInvocation(Object proxy, Method method, Object[] args) th if (result instanceof Mono) { Mono mono = (Mono) result; - if (!method.getName().equals("exec") && !method.getName().equals("multi")) { + if (!method.getName().equals("exec") && !method.getName().equals("multi") + && !method.getName().equals("watch")) { if (connection instanceof StatefulRedisConnection && ((StatefulRedisConnection) connection).isMulti()) { mono.subscribe(); return null;