From cfad808943590751f1b315754e4768d0d03f1fc9 Mon Sep 17 00:00:00 2001 From: Shoham Elias Date: Wed, 14 Aug 2024 14:15:59 +0000 Subject: [PATCH] Bump redis-rs + Route Function Stats to all nodes Signed-off-by: Shoham Elias --- CHANGELOG.md | 3 +- .../src/client/reconnecting_connection.rs | 2 +- glide-core/src/client/standalone_client.rs | 18 +++++++- .../glide/async_commands/cluster_commands.py | 12 +++--- .../async_commands/standalone_commands.py | 25 ++++++++--- .../glide/async_commands/transaction.py | 2 +- python/python/glide/constants.py | 10 ++++- python/python/tests/test_async_client.py | 41 +++++++++++++------ python/python/tests/utils/utils.py | 6 +-- submodules/redis-rs | 2 +- 10 files changed, 86 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4531e039ad..e4a5abd635 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -67,11 +67,12 @@ * Node: Added PUBSUB * commands ([#2090](https://github.com/valkey-io/valkey-glide/pull/2090)) * Python: Added PUBSUB * commands ([#2043](https://github.com/valkey-io/valkey-glide/pull/2043)) * Node: Added XGROUP CREATE & XGROUP DESTROY commands ([#2084](https://github.com/valkey-io/valkey-glide/pull/2084)) -* Node: Added BZPOPMAX & BZPOPMIN command ([#2077]((https://github.com/valkey-io/valkey-glide/pull/2077)) +* Node: Added BZPOPMAX & BZPOPMIN command ([#2077]((https://github.com/valkey-io/valkey-glide/pull/2077))) * Node: Added XGROUP CREATECONSUMER & XGROUP DELCONSUMER commands ([#2088](https://github.com/valkey-io/valkey-glide/pull/2088)) #### Breaking Changes * Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005)) +* Core: Change FUNCTION STATS command to return multi node response for standalone mode #### Fixes * Java: Add overloads for XADD to allow duplicate entry keys ([#1970](https://github.com/valkey-io/valkey-glide/pull/1970)) diff --git a/glide-core/src/client/reconnecting_connection.rs b/glide-core/src/client/reconnecting_connection.rs index c76da9cf42..4d962e40dd 100644 --- a/glide-core/src/client/reconnecting_connection.rs +++ b/glide-core/src/client/reconnecting_connection.rs @@ -165,7 +165,7 @@ impl ReconnectingConnection { create_connection(backend, connection_retry_strategy, push_sender).await } - fn node_address(&self) -> String { + pub(crate) fn node_address(&self) -> String { self.inner .backend .connection_info diff --git a/glide-core/src/client/standalone_client.rs b/glide-core/src/client/standalone_client.rs index a8350651e9..f0d88d7178 100644 --- a/glide-core/src/client/standalone_client.rs +++ b/glide-core/src/client/standalone_client.rs @@ -312,7 +312,23 @@ impl StandaloneClient { Some(ResponsePolicy::CombineMaps) => future::try_join_all(requests) .await .and_then(cluster_routing::combine_map_results), - Some(ResponsePolicy::Special) | None => { + Some(ResponsePolicy::Special) => { + // Await all futures and collect results + let results = future::try_join_all(requests).await?; + let map_entries = self + .inner + .nodes + .iter() + .zip(results) + .map(|(node, result)| { + (redis::Value::BulkString(node.node_address().into()), result) + }) + .collect(); + + Ok(Value::Map(map_entries)) + } + + None => { // This is our assumption - if there's no coherent way to aggregate the responses, we just collect them in an array, and pass it to the user. // TODO - once Value::Error is merged, we can use join_all and report separate errors and also pass successes. future::try_join_all(requests).await.map(Value::Array) diff --git a/python/python/glide/async_commands/cluster_commands.py b/python/python/glide/async_commands/cluster_commands.py index f59cb4662d..d0735d392b 100644 --- a/python/python/glide/async_commands/cluster_commands.py +++ b/python/python/glide/async_commands/cluster_commands.py @@ -18,7 +18,7 @@ TClusterResponse, TEncodable, TFunctionListResponse, - TFunctionStatsResponse, + TFunctionStatsSingleNodeResponse, TResult, TSingleNodeRoute, ) @@ -587,7 +587,7 @@ async def fcall_ro_route( async def function_stats( self, route: Optional[Route] = None - ) -> TClusterResponse[TFunctionStatsResponse]: + ) -> TClusterResponse[TFunctionStatsSingleNodeResponse]: """ Returns information about the function that's currently running and information about the available execution engines. @@ -595,11 +595,11 @@ async def function_stats( See https://valkey.io/commands/function-stats/ for more details Args: - route (Optional[Route]): Specifies the routing configuration for the command. The client - will route the command to the nodes defined by `route`. + route (Optional[Route]):The command will be routed automatically to all nodes, unless `route` is provided, in which + case the client will route the command to the nodes defined by `route`. Defaults to None. Returns: - TClusterResponse[TFunctionStatsResponse]: A `Mapping` with two keys: + TClusterResponse[TFunctionStatsSingleNodeResponse]: A `Mapping` with two keys: - `running_script` with information about the running script. - `engines` with information about available engines and their stats. See example for more details. @@ -623,7 +623,7 @@ async def function_stats( Since: Valkey version 7.0.0. """ return cast( - TClusterResponse[TFunctionStatsResponse], + TClusterResponse[TFunctionStatsSingleNodeResponse], await self._execute_command(RequestType.FunctionStats, [], route), ) diff --git a/python/python/glide/async_commands/standalone_commands.py b/python/python/glide/async_commands/standalone_commands.py index 37815e14c4..98380c271c 100644 --- a/python/python/glide/async_commands/standalone_commands.py +++ b/python/python/glide/async_commands/standalone_commands.py @@ -18,7 +18,7 @@ TOK, TEncodable, TFunctionListResponse, - TFunctionStatsResponse, + TFunctionStatsFullResponse, TResult, ) from glide.protobuf.command_request_pb2 import RequestType @@ -390,7 +390,7 @@ async def function_kill(self) -> TOK: await self._execute_command(RequestType.FunctionKill, []), ) - async def function_stats(self) -> TFunctionStatsResponse: + async def function_stats(self) -> TFunctionStatsFullResponse: """ Returns information about the function that's currently running and information about the available execution engines. @@ -398,14 +398,14 @@ async def function_stats(self) -> TFunctionStatsResponse: See https://valkey.io/commands/function-stats/ for more details Returns: - TFunctionStatsResponse: A `Mapping` with two keys: + TFunctionStatsFullResponse: A Map where the key is the node adrress anf the value is a Map of two keys: - `running_script` with information about the running script. - `engines` with information about available engines and their stats. See example for more details. Examples: >>> await client.function_stats() - { + {b"addr": { 'running_script': { 'name': 'foo', 'command': ['FCALL', 'foo', '0', 'hello'], @@ -417,12 +417,25 @@ async def function_stats(self) -> TFunctionStatsResponse: 'functions_count': 1, } } - } + }, + b"addr2": { + 'running_script': { + 'name': 'foo', + 'command': ['FCALL', 'foo', '0', 'hello'], + 'duration_ms': 7758 + }, + 'engines': { + 'LUA': { + 'libraries_count': 1, + 'functions_count': 1, + } + } + }} Since: Valkey version 7.0.0. """ return cast( - TFunctionStatsResponse, + TFunctionStatsFullResponse, await self._execute_command(RequestType.FunctionStats, []), ) diff --git a/python/python/glide/async_commands/transaction.py b/python/python/glide/async_commands/transaction.py index 1573d51c03..7eb9600bdf 100644 --- a/python/python/glide/async_commands/transaction.py +++ b/python/python/glide/async_commands/transaction.py @@ -2028,7 +2028,7 @@ def function_stats(self: TTransaction) -> TTransaction: See https://valkey.io/commands/function-stats/ for more details Command Response: - TFunctionStatsResponse: A `Mapping` with two keys: + TFunctionStatsSingleNodeResponse: A `Mapping` with two keys: - `running_script` with information about the running script. - `engines` with information about available engines and their stats. See example for more details. diff --git a/python/python/glide/constants.py b/python/python/glide/constants.py index 24c30d8de7..6c208d5376 100644 --- a/python/python/glide/constants.py +++ b/python/python/glide/constants.py @@ -42,15 +42,21 @@ Union[bytes, List[Mapping[bytes, Union[bytes, Set[bytes]]]]], ] ] -TFunctionStatsResponse = Mapping[ +TFunctionStatsSingleNodeResponse = Mapping[ bytes, Union[ None, Mapping[ - bytes, Union[Mapping[bytes, Mapping[bytes, int]], bytes, int, List[bytes]] + bytes, + Union[Mapping[bytes, Mapping[bytes, int]], bytes, int, List[bytes]], ], ], ] +TFunctionStatsFullResponse = Mapping[ + bytes, + TFunctionStatsSingleNodeResponse, +] + TXInfoStreamResponse = Mapping[ bytes, Union[bytes, int, Mapping[bytes, Optional[List[List[bytes]]]]] diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index d46e490b70..80255004fe 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -76,7 +76,7 @@ ProtocolVersion, ServerCredentials, ) -from glide.constants import OK, TEncodable, TFunctionStatsResponse, TResult +from glide.constants import OK, TEncodable, TFunctionStatsSingleNodeResponse, TResult from glide.exceptions import TimeoutError as GlideTimeoutError from glide.glide_client import GlideClient, GlideClusterClient, TGlideClient from glide.routes import ( @@ -7346,6 +7346,11 @@ async def test_bitop(self, glide_client: TGlideClient): with pytest.raises(RequestError): await glide_client.bitop(BitwiseOperation.AND, destination, [set_key]) + @pytest.mark.parametrize("cluster_mode", [True]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_shoham(self, glide_client: GlideClusterClient): + print(await glide_client.function_stats(RandomNode())) + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_bitfield(self, glide_client: TGlideClient): @@ -8222,7 +8227,10 @@ async def test_function_stats(self, glide_client: GlideClient): assert await glide_client.function_load(code, True) == lib_name.encode() response = await glide_client.function_stats() - check_function_stats_response(response, [], 1, 1) + for node_response in response.values(): + check_function_stats_response( + cast(TFunctionStatsSingleNodeResponse, node_response), [], 1, 1 + ) code = generate_lua_lib_code( lib_name + "_2", @@ -8234,12 +8242,18 @@ async def test_function_stats(self, glide_client: GlideClient): ) response = await glide_client.function_stats() - check_function_stats_response(response, [], 2, 3) + for node_response in response.values(): + check_function_stats_response( + cast(TFunctionStatsSingleNodeResponse, node_response), [], 2, 3 + ) assert await glide_client.function_flush(FlushMode.SYNC) == OK response = await glide_client.function_stats() - check_function_stats_response(response, [], 0, 0) + for node_response in response.values(): + check_function_stats_response( + cast(TFunctionStatsSingleNodeResponse, node_response), [], 0, 0 + ) @pytest.mark.parametrize("cluster_mode", [True]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) @@ -8259,7 +8273,7 @@ async def test_function_stats_cluster(self, glide_client: GlideClusterClient): response = await glide_client.function_stats() for node_response in response.values(): check_function_stats_response( - cast(TFunctionStatsResponse, node_response), [], 1, 1 + cast(TFunctionStatsSingleNodeResponse, node_response), [], 1, 1 ) code = generate_lua_lib_code( @@ -8274,7 +8288,7 @@ async def test_function_stats_cluster(self, glide_client: GlideClusterClient): response = await glide_client.function_stats() for node_response in response.values(): check_function_stats_response( - cast(TFunctionStatsResponse, node_response), [], 2, 3 + cast(TFunctionStatsSingleNodeResponse, node_response), [], 2, 3 ) assert await glide_client.function_flush(FlushMode.SYNC) == OK @@ -8282,7 +8296,7 @@ async def test_function_stats_cluster(self, glide_client: GlideClusterClient): response = await glide_client.function_stats() for node_response in response.values(): check_function_stats_response( - cast(TFunctionStatsResponse, node_response), [], 0, 0 + cast(TFunctionStatsSingleNodeResponse, node_response), [], 0, 0 ) @pytest.mark.parametrize("cluster_mode", [True]) @@ -8311,12 +8325,12 @@ async def test_function_stats_with_routing( response = await glide_client.function_stats(route) if single_route: check_function_stats_response( - cast(TFunctionStatsResponse, response), [], 1, 1 + cast(TFunctionStatsSingleNodeResponse, response), [], 1, 1 ) else: for node_response in response.values(): check_function_stats_response( - cast(TFunctionStatsResponse, node_response), [], 1, 1 + cast(TFunctionStatsSingleNodeResponse, node_response), [], 1, 1 ) code = generate_lua_lib_code( @@ -8332,12 +8346,12 @@ async def test_function_stats_with_routing( response = await glide_client.function_stats(route) if single_route: check_function_stats_response( - cast(TFunctionStatsResponse, response), [], 2, 3 + cast(TFunctionStatsSingleNodeResponse, response), [], 2, 3 ) else: for node_response in response.values(): check_function_stats_response( - cast(TFunctionStatsResponse, node_response), [], 2, 3 + cast(TFunctionStatsSingleNodeResponse, node_response), [], 2, 3 ) assert await glide_client.function_flush(FlushMode.SYNC, route) == OK @@ -8345,12 +8359,12 @@ async def test_function_stats_with_routing( response = await glide_client.function_stats(route) if single_route: check_function_stats_response( - cast(TFunctionStatsResponse, response), [], 0, 0 + cast(TFunctionStatsSingleNodeResponse, response), [], 0, 0 ) else: for node_response in response.values(): check_function_stats_response( - cast(TFunctionStatsResponse, node_response), [], 0, 0 + cast(TFunctionStatsSingleNodeResponse, node_response), [], 0, 0 ) @pytest.mark.parametrize("cluster_mode", [True, False]) @@ -10234,3 +10248,4 @@ async def test_script_large_keys_and_args(self, request, cluster_mode, protocol) == key.encode() ) await glide_client.close() + await glide_client.close() diff --git a/python/python/tests/utils/utils.py b/python/python/tests/utils/utils.py index 96f08a7b5a..691bf98c42 100644 --- a/python/python/tests/utils/utils.py +++ b/python/python/tests/utils/utils.py @@ -8,7 +8,7 @@ from glide.constants import ( TClusterResponse, TFunctionListResponse, - TFunctionStatsResponse, + TFunctionStatsSingleNodeResponse, TResult, ) from glide.glide_client import TGlideClient @@ -309,7 +309,7 @@ def check_function_list_response( def check_function_stats_response( - response: TFunctionStatsResponse, + response: TFunctionStatsSingleNodeResponse, running_function: List[bytes], lib_count: int, function_count: int, @@ -318,7 +318,7 @@ def check_function_stats_response( Validate whether `FUNCTION STATS` response contains required info. Args: - response (TFunctionStatsResponse): The response from server. + response (TFunctionStatsSingleNodeResponse): The response from server. running_function (List[bytes]): Command line of running function expected. Empty, if nothing expected. lib_count (int): Expected libraries count. function_count (int): Expected functions count. diff --git a/submodules/redis-rs b/submodules/redis-rs index de53b2b5c6..b43a07e7f7 160000 --- a/submodules/redis-rs +++ b/submodules/redis-rs @@ -1 +1 @@ -Subproject commit de53b2b5c68e7ef4667e2b195eb9b1d0dd460722 +Subproject commit b43a07e7f76e3f341661b95b40df0d60ba6f89f8