Skip to content

Commit

Permalink
add aggr tests
Browse files Browse the repository at this point in the history
Signed-off-by: Shoham Elias <[email protected]>
  • Loading branch information
shohamazon committed Dec 22, 2024
1 parent 83ba370 commit c110826
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 42 deletions.
11 changes: 5 additions & 6 deletions python/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ async def create_client(
addresses: Optional[List[NodeAddress]] = None,
client_name: Optional[str] = None,
protocol: ProtocolVersion = ProtocolVersion.RESP3,
timeout: Optional[int] = 1000,
request_timeout: Optional[int] = 1000,
connection_timeout: Optional[int] = 1000,
cluster_mode_pubsub: Optional[
GlideClusterClientConfiguration.PubSubSubscriptions
Expand All @@ -269,7 +269,7 @@ async def create_client(
credentials=credentials,
client_name=client_name,
protocol=protocol,
request_timeout=timeout,
request_timeout=request_timeout,
pubsub_subscriptions=cluster_mode_pubsub,
inflight_requests_limit=inflight_requests_limit,
read_from=read_from,
Expand All @@ -288,8 +288,7 @@ async def create_client(
database_id=database_id,
client_name=client_name,
protocol=protocol,
request_timeout=timeout,
connection_timeout=connection_timeout,
request_timeout=request_timeout,
pubsub_subscriptions=standalone_mode_pubsub,
inflight_requests_limit=inflight_requests_limit,
read_from=read_from,
Expand Down Expand Up @@ -347,7 +346,7 @@ async def test_teardown(request, cluster_mode: bool, protocol: ProtocolVersion):
try:
# Try connecting without credentials
client = await create_client(
request, cluster_mode, protocol=protocol, timeout=2000
request, cluster_mode, protocol=protocol, request_timeout=2000
)
await client.custom_command(["FLUSHALL"])
await client.close()
Expand All @@ -360,7 +359,7 @@ async def test_teardown(request, cluster_mode: bool, protocol: ProtocolVersion):
request,
cluster_mode,
protocol=protocol,
timeout=2000,
request_timeout=2000,
credentials=credentials,
)
try:
Expand Down
91 changes: 55 additions & 36 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ async def test_register_client_name_and_version(self, glide_client: TGlideClient
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_send_and_receive_large_values(self, request, cluster_mode, protocol):
glide_client = await create_client(
request, cluster_mode=cluster_mode, protocol=protocol, timeout=5000
request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=5000
)
length = 2**25 # 33mb
key = "0" * length
Expand Down Expand Up @@ -302,7 +302,7 @@ async def test_statistics(self, glide_client: TGlideClient):
assert "total_clients" in stats
assert len(stats) == 2

@pytest.mark.parametrize("cluster_mode", [True])
@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_connection_timeout(
self,
Expand All @@ -314,61 +314,74 @@ async def test_connection_timeout(
client = await create_client(
request,
cluster_mode,
# addresses=multiple_replicas_cluster.nodes_addr,
protocol=protocol,
timeout=2000,
request_timeout=2000,
connection_timeout=2000,
)

assert isinstance(client, (GlideClient, GlideClusterClient))

assert await client.set("key", "value") == "OK"

await client.close()

@pytest.mark.parametrize("cluster_mode", [True])
@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_connection_timeout_when_running_long_script(
async def test_connection_timeout_when_client_is_blocked(
self,
request,
cluster_mode: bool,
protocol: ProtocolVersion,
):
# Create the first client to run a long-running Lua script
long_script_client = await create_client(
client = await create_client(
request,
cluster_mode,
protocol=protocol,
timeout=10000, # 10 seconds timeout for the script execution
request_timeout=20000, # 20 seconds timeout
)

# Define a long-running Lua script (e.g., sleeps for 10 seconds)
long_script_code = create_long_running_lua_script(5)
async def run_debug_sleep():
"""
Run a long-running DEBUG SLEEP command.
"""
command = ["DEBUG", "sleep", "7"]
if isinstance(client, GlideClusterClient):
await client.custom_command(command, AllNodes())
else:
await client.custom_command(command)

# Function to run the long-running script
async def run_long_script():
await long_script_client.invoke_script(Script(long_script_code))
async def fail_to_connect_to_client():
# try to connect with a small timeout connection
await asyncio.sleep(1)
print(f"{datetime.now()} here first")
with pytest.raises(ClosingError) as e:
await create_client(
request,
cluster_mode,
protocol=protocol,
connection_timeout=100, # 100 ms
)
assert "timed out" in str(e)

async def connect_to_client():
# Create a second client with a connection timeout of 5 seconds
# Create a second client with a connection timeout of 7 seconds
await asyncio.sleep(1)
timeout_client = await create_client(
request,
cluster_mode,
protocol=protocol,
timeout=5000, # General timeout for operations
connection_timeout=7000, # 5-second connection timeout
connection_timeout=10000, # 10-second connection timeout
)

# Ensure the second client can connect and perform a simple operation
assert await timeout_client.set("key", "value") == "OK"

await timeout_client.close()

# Run the long script and attempt to connect concurrently
await asyncio.gather(run_long_script(), connect_to_client())
# Run tests
await asyncio.gather(run_debug_sleep(), fail_to_connect_to_client())
await asyncio.gather(run_debug_sleep(), connect_to_client())

# Clean up both clients
await long_script_client.close()
# Clean up the main client
await client.close()


@pytest.mark.asyncio
Expand Down Expand Up @@ -5480,7 +5493,10 @@ async def test_xread_edge_cases_and_failures(
)

test_client = await create_client(
request=request, protocol=protocol, cluster_mode=cluster_mode, timeout=900
request=request,
protocol=protocol,
cluster_mode=cluster_mode,
request_timeout=900,
)
# ensure command doesn't time out even if timeout > request timeout
assert (
Expand Down Expand Up @@ -5873,7 +5889,10 @@ async def test_xreadgroup_edge_cases_and_failures(
)

test_client = await create_client(
request=request, protocol=protocol, cluster_mode=cluster_mode, timeout=900
request=request,
protocol=protocol,
cluster_mode=cluster_mode,
request_timeout=900,
)
timeout_key = f"{{testKey}}:{get_random_string(10)}"
timeout_group_name = get_random_string(10)
Expand Down Expand Up @@ -8401,11 +8420,11 @@ async def test_function_stats_running_script(

# create a second client to run fcall
test_client = await create_client(
request, cluster_mode=cluster_mode, protocol=protocol, timeout=30000
request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=30000
)

test_client2 = await create_client(
request, cluster_mode=cluster_mode, protocol=protocol, timeout=30000
request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=30000
)

async def endless_fcall_route_call():
Expand Down Expand Up @@ -8530,7 +8549,7 @@ async def test_function_kill_no_write(

# create a second client to run fcall
test_client = await create_client(
request, cluster_mode=cluster_mode, protocol=protocol, timeout=15000
request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=15000
)

async def endless_fcall_route_call():
Expand Down Expand Up @@ -8585,7 +8604,7 @@ async def test_function_kill_write_is_unkillable(

# create a second client to run fcall - and give it a long timeout
test_client = await create_client(
request, cluster_mode=cluster_mode, protocol=protocol, timeout=15000
request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=15000
)

# call fcall to run the function loaded function
Expand Down Expand Up @@ -10433,7 +10452,7 @@ async def test_script_binary(self, glide_client: TGlideClient):
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_script_large_keys_no_args(self, request, cluster_mode, protocol):
glide_client = await create_client(
request, cluster_mode=cluster_mode, protocol=protocol, timeout=5000
request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=5000
)
length = 2**13 # 8kb
key = "0" * length
Expand All @@ -10445,7 +10464,7 @@ async def test_script_large_keys_no_args(self, request, cluster_mode, protocol):
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_script_large_args_no_keys(self, request, cluster_mode, protocol):
glide_client = await create_client(
request, cluster_mode=cluster_mode, protocol=protocol, timeout=5000
request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=5000
)
length = 2**12 # 4kb
arg1 = "0" * length
Expand All @@ -10461,7 +10480,7 @@ async def test_script_large_args_no_keys(self, request, cluster_mode, protocol):
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_script_large_keys_and_args(self, request, cluster_mode, protocol):
glide_client = await create_client(
request, cluster_mode=cluster_mode, protocol=protocol, timeout=5000
request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=5000
)
length = 2**12 # 4kb
key = "0" * length
Expand Down Expand Up @@ -10545,7 +10564,7 @@ async def test_script_kill_route(

# Create a second client to run the script
test_client = await create_client(
request, cluster_mode=cluster_mode, protocol=protocol, timeout=30000
request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=30000
)

await script_kill_tests(glide_client, test_client, route)
Expand All @@ -10561,7 +10580,7 @@ async def test_script_kill_no_route(
):
# Create a second client to run the script
test_client = await create_client(
request, cluster_mode=cluster_mode, protocol=protocol, timeout=30000
request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=30000
)

await script_kill_tests(glide_client, test_client)
Expand All @@ -10573,12 +10592,12 @@ async def test_script_kill_unkillable(
):
# Create a second client to run the script
test_client = await create_client(
request, cluster_mode=cluster_mode, protocol=protocol, timeout=30000
request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=30000
)

# Create a second client to kill the script
test_client2 = await create_client(
request, cluster_mode=cluster_mode, protocol=protocol, timeout=15000
request, cluster_mode=cluster_mode, protocol=protocol, request_timeout=15000
)

# Add test for script_kill with writing script
Expand Down
2 changes: 2 additions & 0 deletions utils/cluster_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ def get_server_command() -> str:
"yes",
"--logfile",
logfile,
"--enable-debug-command",
"yes",
]
if load_module:
if len(load_module) == 0:
Expand Down

0 comments on commit c110826

Please sign in to comment.