Skip to content

Commit

Permalink
Added RedisCE cluster support for the JSON.MGET and JSON.MSET commands
Browse files Browse the repository at this point in the history
  • Loading branch information
tishun committed Aug 26, 2024
1 parent fd842b3 commit 69d843c
Show file tree
Hide file tree
Showing 35 changed files with 872 additions and 241 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1518,7 +1518,7 @@ public RedisFuture<List<JsonValue<K, V>>> jsonMGet(JsonPath jsonPath, K... keys)
}

@Override
public RedisFuture<String> jsonMSet(JsonMsetArgs... arguments) {
public RedisFuture<String> jsonMSet(List<JsonMsetArgs<K, V>> arguments) {
return dispatch(jsonCommandBuilder.jsonMSet(arguments));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import io.lettuce.core.codec.Base16;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.json.JsonParser;
import io.lettuce.core.json.JsonParserRegistry;
import io.lettuce.core.json.JsonPath;
import io.lettuce.core.json.JsonValue;
import io.lettuce.core.json.arguments.JsonGetArgs;
Expand Down Expand Up @@ -1583,7 +1581,7 @@ public Flux<JsonValue<K, V>> jsonMGet(JsonPath jsonPath, K... keys) {
}

@Override
public Mono<String> jsonMSet(JsonMsetArgs... arguments) {
public Mono<String> jsonMSet(List<JsonMsetArgs<K, V>> arguments) {
return createMono(() -> jsonCommandBuilder.jsonMSet(arguments));
}

Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/lettuce/core/RedisJsonCommandBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,13 @@ Command<K, V, List<JsonValue<K, V>>> jsonMGet(JsonPath jsonPath, K... keys) {
return createCommand(JSON_MGET, new JsonValueListOutput<>(codec), args);
}

Command<K, V, String> jsonMSet(JsonMsetArgs... arguments) {
Command<K, V, String> jsonMSet(List<JsonMsetArgs<K, V>> arguments) {

notEmpty(arguments);
notEmpty(arguments.toArray());

CommandArgs<K, V> args = new CommandArgs<>(codec);

for (JsonMsetArgs argument : arguments) {
for (JsonMsetArgs<K, V> argument : arguments) {
argument.build(args);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,6 @@
* All rights reserved.
*
* Licensed under the MIT License.
*
* This file contains contributions from third-party contributors
* licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core.api.async;

Expand Down Expand Up @@ -202,7 +189,7 @@ public interface RedisJsonAsyncCommands<K, V> {
* @return "OK" if the operation was successful, error otherwise
* @since 6.5
*/
RedisFuture<String> jsonMSet(JsonMsetArgs... arguments);
RedisFuture<String> jsonMSet(List<JsonMsetArgs<K, V>> arguments);

/**
* Increment the number value stored at the specified {@link JsonPath} in the JSON document by the provided increment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,10 @@
* All rights reserved.
*
* Licensed under the MIT License.
*
* This file contains contributions from third-party contributors
* licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core.api.reactive;

import java.util.List;
import io.lettuce.core.json.JsonPath;
import io.lettuce.core.json.JsonValue;
import io.lettuce.core.json.arguments.JsonGetArgs;
Expand Down Expand Up @@ -202,7 +190,7 @@ public interface RedisJsonReactiveCommands<K, V> {
* @return "OK" if the operation was successful, error otherwise
* @since 6.5
*/
Mono<String> jsonMSet(JsonMsetArgs... arguments);
Mono<String> jsonMSet(List<JsonMsetArgs<K, V>> arguments);

/**
* Increment the number value stored at the specified {@link JsonPath} in the JSON document by the provided increment.
Expand Down
15 changes: 1 addition & 14 deletions src/main/java/io/lettuce/core/api/sync/RedisJsonCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,6 @@
* All rights reserved.
*
* Licensed under the MIT License.
*
* This file contains contributions from third-party contributors
* licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core.api.sync;

Expand Down Expand Up @@ -201,7 +188,7 @@ public interface RedisJsonCommands<K, V> {
* @return "OK" if the operation was successful, error otherwise
* @since 6.5
*/
String jsonMSet(JsonMsetArgs... arguments);
String jsonMSet(List<JsonMsetArgs<K, V>> arguments);

/**
* Increment the number value stored at the specified {@link JsonPath} in the JSON document by the provided increment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import io.lettuce.core.*;
import io.lettuce.core.api.StatefulRedisConnection;
Expand All @@ -52,6 +53,11 @@
import io.lettuce.core.cluster.models.partitions.Partitions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.json.JsonParser;
import io.lettuce.core.json.JsonParserRegistry;
import io.lettuce.core.json.JsonPath;
import io.lettuce.core.json.JsonValue;
import io.lettuce.core.json.arguments.JsonMsetArgs;
import io.lettuce.core.output.IntegerOutput;
import io.lettuce.core.output.KeyStreamingChannel;
import io.lettuce.core.output.KeyValueStreamingChannel;
Expand All @@ -67,6 +73,7 @@
* @param <V> Value type.
* @author Mark Paluch
* @author Jon Chambers
* @author Tihomir Mateev
* @since 3.3
*/
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -282,6 +289,54 @@ public RedisFuture<Long> keys(KeyStreamingChannel<K> channel, K pattern) {
return MultiNodeExecution.aggregateAsync(executions);
}

@Override
public RedisFuture<List<JsonValue<K, V>>> jsonMGet(JsonPath jsonPath, K... keys) {
Map<Integer, List<K>> partitioned = SlotHash.partition(codec, Arrays.asList(keys));

if (partitioned.size() < 2) {
return super.jsonMGet(jsonPath, keys);
}

// For a given partition, maps the key to its index within the List<K> in partitioned for faster lookups below
Map<Integer, Map<K, Integer>> keysToIndexes = mapKeyToIndex(partitioned);
Map<K, Integer> slots = SlotHash.getSlots(partitioned);
Map<Integer, RedisFuture<List<JsonValue<K, V>>>> executions = new HashMap<>(partitioned.size());

for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
K[] partitionKeys = entry.getValue().toArray((K[]) new Object[entry.getValue().size()]);
RedisFuture<List<JsonValue<K, V>>> jsonMget = super.jsonMGet(jsonPath, partitionKeys);
executions.put(entry.getKey(), jsonMget);
}

// restore order of key
return new PipelinedRedisFuture<>(executions, objectPipelinedRedisFuture -> {
List<JsonValue<K, V>> result = new ArrayList<>(slots.size());
for (K opKey : keys) {
int slot = slots.get(opKey);

int position = keysToIndexes.get(slot).get(opKey);
RedisFuture<List<JsonValue<K, V>>> listRedisFuture = executions.get(slot);
result.add(MultiNodeExecution.execute(() -> listRedisFuture.get().get(position)));
}

return result;
});
}

private Map<Integer, Map<K, Integer>> mapKeyToIndex(Map<Integer, List<K>> partitioned) {
Map<Integer, Map<K, Integer>> result = new HashMap<>(partitioned.size());
for (Integer partition : partitioned.keySet()) {
List<K> keysForPartition = partitioned.get(partition);
Map<K, Integer> keysToIndexes = new HashMap<>(keysForPartition.size());
for (int i = 0; i < keysForPartition.size(); i++) {
keysToIndexes.put(keysForPartition.get(i), i);
}
result.put(partition, keysToIndexes);
}

return result;
}

@Override
public RedisFuture<List<KeyValue<K, V>>> mget(K... keys) {
return mget(Arrays.asList(keys));
Expand All @@ -296,15 +351,7 @@ public RedisFuture<List<KeyValue<K, V>>> mget(Iterable<K> keys) {
}

// For a given partition, maps the key to its index within the List<K> in partitioned for faster lookups below
Map<Integer, Map<K, Integer>> partitionedKeysToIndexes = new HashMap<>(partitioned.size());
for (Integer partition : partitioned.keySet()) {
List<K> keysForPartition = partitioned.get(partition);
Map<K, Integer> keysToIndexes = new HashMap<>(keysForPartition.size());
for (int i = 0; i < keysForPartition.size(); i++) {
keysToIndexes.put(keysForPartition.get(i), i);
}
partitionedKeysToIndexes.put(partition, keysToIndexes);
}
Map<Integer, Map<K, Integer>> partitionedKeysToIndexes = mapKeyToIndex(partitioned);
Map<K, Integer> slots = SlotHash.getSlots(partitioned);
Map<Integer, RedisFuture<List<KeyValue<K, V>>>> executions = new HashMap<>(partitioned.size());

Expand Down Expand Up @@ -351,6 +398,28 @@ public RedisFuture<Long> mget(KeyValueStreamingChannel<K, V> channel, Iterable<K
return MultiNodeExecution.aggregateAsync(executions);
}

@Override
public RedisFuture<String> jsonMSet(List<JsonMsetArgs<K, V>> arguments) {
List<K> keys = arguments.stream().map(JsonMsetArgs::getKey).collect(Collectors.toList());
Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keys);

if (partitioned.size() < 2) {
return super.jsonMSet(arguments);
}

Map<Integer, RedisFuture<String>> executions = new HashMap<>();

for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
entry.getValue().forEach(k -> {
RedisFuture<String> mset = super.jsonMSet(
arguments.stream().filter(args -> args.getKey().equals(k)).collect(Collectors.toList()));
executions.put(entry.getKey(), mset);
});
}

return MultiNodeExecution.firstOfAsync(executions);
}

@Override
public RedisFuture<String> mset(Map<K, V> map) {

Expand Down Expand Up @@ -692,4 +761,14 @@ static <T extends ScanCursor, K, V> RedisFuture<T> clusterScan(StatefulRedisClus
return mapper.map(nodeIds, currentNodeId, scanCursor);
}

@Override
public JsonParser<K, V> getJsonParser() {
return JsonParserRegistry.getJsonParser(this.codec);
}

@Override
public void setJsonParser(JsonParser<K, V> jsonParser) {
throw new UnsupportedOperationException("Setting a custom JsonParser is not supported");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public interface NodeSelectionJsonAsyncCommands<K, V> {
* @return "OK" if the operation was successful, error otherwise
* @since 6.5
*/
AsyncExecutions<String> jsonMSet(JsonMsetArgs... arguments);
AsyncExecutions<String> jsonMSet(List<JsonMsetArgs<K, V>> arguments);

/**
* Increment the number value stored at the specified {@link JsonPath} in the JSON document by the provided increment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.lettuce.core.Range;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.async.*;
import io.lettuce.core.json.JsonParser;

/**
* A complete asynchronous and thread-safe cluster Redis API with 400+ Methods.
Expand Down Expand Up @@ -380,4 +381,8 @@ public interface RedisClusterAsyncCommands<K, V> extends BaseRedisAsyncCommands<
*/
RedisFuture<String> readWrite();

JsonParser<K, V> getJsonParser();

void setJsonParser(JsonParser<K, V> jsonParser);

}
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public interface NodeSelectionJsonCommands<K, V> {
* @return "OK" if the operation was successful, error otherwise
* @since 6.5
*/
Executions<String> jsonMSet(JsonMsetArgs... arguments);
Executions<String> jsonMSet(List<JsonMsetArgs<K, V>> arguments);

/**
* Increment the number value stored at the specified {@link JsonPath} in the JSON document by the provided increment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import io.lettuce.core.Range;
import io.lettuce.core.api.sync.*;
import io.lettuce.core.json.JsonParser;

/**
* A complete synchronous and thread-safe Redis Cluster API with 400+ Methods.
Expand All @@ -34,10 +35,11 @@
* @author dengliming
* @since 4.0
*/
public interface RedisClusterCommands<K, V> extends BaseRedisCommands<K, V>, RedisAclCommands<K, V>,
RedisFunctionCommands<K, V>, RedisGeoCommands<K, V>, RedisHashCommands<K, V>, RedisHLLCommands<K, V>,
RedisKeyCommands<K, V>, RedisListCommands<K, V>, RedisScriptingCommands<K, V>, RedisServerCommands<K, V>,
RedisSetCommands<K, V>, RedisSortedSetCommands<K, V>, RedisStreamCommands<K, V>, RedisStringCommands<K, V> {
public interface RedisClusterCommands<K, V>
extends BaseRedisCommands<K, V>, RedisAclCommands<K, V>, RedisFunctionCommands<K, V>, RedisGeoCommands<K, V>,
RedisHashCommands<K, V>, RedisHLLCommands<K, V>, RedisKeyCommands<K, V>, RedisListCommands<K, V>,
RedisScriptingCommands<K, V>, RedisServerCommands<K, V>, RedisSetCommands<K, V>, RedisSortedSetCommands<K, V>,
RedisStreamCommands<K, V>, RedisStringCommands<K, V>, RedisJsonCommands<K, V> {

/**
* Set the default timeout for operations. A zero timeout value indicates to not time out.
Expand Down Expand Up @@ -368,4 +370,8 @@ public interface RedisClusterCommands<K, V> extends BaseRedisCommands<K, V>, Red
@Override
String readWrite();

JsonParser<K, V> getJsonParser();

void setJsonParser(JsonParser<K, V> jsonParser);

}
13 changes: 0 additions & 13 deletions src/main/java/io/lettuce/core/json/JsonObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,6 @@
* All rights reserved.
*
* Licensed under the MIT License.
*
* This file contains contributions from third-party contributors
* licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.lettuce.core.json;
Expand Down
Loading

0 comments on commit 69d843c

Please sign in to comment.