From c7f496de9ed3b6e49f2ee086dd83705f1c4ceb79 Mon Sep 17 00:00:00 2001 From: adi_holden Date: Mon, 30 Dec 2024 11:37:40 +0200 Subject: [PATCH 1/4] fix server: debug populate consume less memory Signed-off-by: adi_holden --- src/server/debugcmd.cc | 39 ++++++++++++++++++-------------- tests/dragonfly/snapshot_test.py | 7 +++--- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 326fd569ee76..13bbbd9bbf07 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -161,26 +161,31 @@ void DoPopulateBatch(string_view type, string_view prefix, size_t val_size, bool absl::InsecureBitGen gen; for (unsigned i = 0; i < batch.sz; ++i) { string key = absl::StrCat(prefix, ":", batch.index[i]); + int32_t elements_left = elements; + while (elements_left) { + int32_t populate_elements = std::min(5, elements_left); + elements_left -= populate_elements; + auto [cid, args] = + GeneratePopulateCommand(type, key, val_size, random_value, populate_elements, + *sf->service().mutable_registry(), &gen); + if (!cid) { + LOG_EVERY_N(WARNING, 10'000) << "Unable to find command, was it renamed?"; + break; + } - auto [cid, args] = GeneratePopulateCommand(type, std::move(key), val_size, random_value, - elements, *sf->service().mutable_registry(), &gen); - if (!cid) { - LOG_EVERY_N(WARNING, 10'000) << "Unable to find command, was it renamed?"; - break; - } - - args_view.clear(); - for (auto& arg : args) { - args_view.push_back(arg); - } - auto args_span = absl::MakeSpan(args_view); + args_view.clear(); + for (auto& arg : args) { + args_view.push_back(arg); + } + auto args_span = absl::MakeSpan(args_view); - stub_tx->MultiSwitchCmd(cid); - local_cntx.cid = cid; - crb.SetReplyMode(ReplyMode::NONE); - stub_tx->InitByArgs(cntx->ns, local_cntx.conn_state.db_index, args_span); + stub_tx->MultiSwitchCmd(cid); + local_cntx.cid = cid; + crb.SetReplyMode(ReplyMode::NONE); + stub_tx->InitByArgs(cntx->ns, local_cntx.conn_state.db_index, args_span); - sf->service().InvokeCmd(cid, args_span, &crb, &local_cntx); + sf->service().InvokeCmd(cid, args_span, &crb, &local_cntx); + } } local_tx->UnlockMulti(); diff --git a/tests/dragonfly/snapshot_test.py b/tests/dragonfly/snapshot_test.py index 81fd7f7fc00a..a6ae402668e6 100644 --- a/tests/dragonfly/snapshot_test.py +++ b/tests/dragonfly/snapshot_test.py @@ -590,17 +590,16 @@ async def test_big_value_serialization_memory_limit(df_factory, cont_type): await client.execute_command( f"debug populate 1 prefix {element_size} TYPE {cont_type} RAND ELEMENTS {elements}" ) + await asyncio.sleep(1) info = await client.info("ALL") - # rss double's because of DEBUG POPULATE - assert info["used_memory_peak_rss"] > (one_gb * 2) + assert info["used_memory_peak_rss"] < (one_gb * 1.2) # if we execute SAVE below without big value serialization we trigger the assertion below. # note the peak would reach (one_gb * 3) without it. await client.execute_command("SAVE") info = await client.info("ALL") - upper_limit = 2_250_000_000 # 2.25 GB - assert info["used_memory_peak_rss"] < upper_limit + assert info["used_memory_peak_rss"] < (one_gb * 1.3) await client.execute_command("FLUSHALL") await client.close() From 402b66359f942e8ce118e32f7a4fcd8b2b45f24d Mon Sep 17 00:00:00 2001 From: adi_holden Date: Mon, 30 Dec 2024 16:04:43 +0200 Subject: [PATCH 2/4] fix json debug populate Signed-off-by: adi_holden --- src/server/debugcmd.cc | 5 +++-- tests/dragonfly/memory_test.py | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 13bbbd9bbf07..9b23f4f1e307 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -124,12 +124,13 @@ tuple> GeneratePopulateCommand( args.push_back(GenerateValue(val_size, random_value, gen)); } } else if (type == "JSON") { - cid = registry.Find("JSON.SET"); + cid = registry.Find("JSON.MERGE"); args.push_back("$"); string json = "{"; for (size_t i = 0; i < elements; ++i) { - absl::StrAppend(&json, "\"", i, "\":\"", GenerateValue(val_size, random_value, gen), "\","); + absl::StrAppend(&json, "\"", GenerateValue(val_size / 2, random_value, gen), "\":\"", + GenerateValue(val_size / 2, random_value, gen), "\","); } json[json.size() - 1] = '}'; // Replace last ',' with '}' args.push_back(json); diff --git a/tests/dragonfly/memory_test.py b/tests/dragonfly/memory_test.py index 292275a2002c..12e65078ed3a 100644 --- a/tests/dragonfly/memory_test.py +++ b/tests/dragonfly/memory_test.py @@ -69,6 +69,7 @@ async def check_memory(): await client.execute_command("DFLY", "LOAD", f"{dbfilename}-summary.dfs") await check_memory() + await client.execute_command("FLUSHALL") @pytest.mark.asyncio From 647d5b855c97f20babaa55c9f757d3b60508207f Mon Sep 17 00:00:00 2001 From: adi_holden Date: Mon, 30 Dec 2024 21:08:40 +0200 Subject: [PATCH 3/4] fix test rss gap with streams Signed-off-by: adi_holden --- src/server/debugcmd.cc | 8 +++++--- tests/dragonfly/memory_test.py | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 9b23f4f1e307..3c72e57514d4 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -158,13 +158,15 @@ void DoPopulateBatch(string_view type, string_view prefix, size_t val_size, bool absl::InlinedVector args_view; facade::CapturingReplyBuilder crb; ConnectionContext local_cntx{cntx, stub_tx.get()}; - absl::InsecureBitGen gen; for (unsigned i = 0; i < batch.sz; ++i) { string key = absl::StrCat(prefix, ":", batch.index[i]); - int32_t elements_left = elements; + uint32_t elements_left = elements; + while (elements_left) { - int32_t populate_elements = std::min(5, elements_left); + // limit rss grow by 32K by limiting the element count in each command. + uint32_t max_batch_elements = std::max(32_KB / val_size, 1ULL); + uint32_t populate_elements = std::min(max_batch_elements, elements_left); elements_left -= populate_elements; auto [cid, args] = GeneratePopulateCommand(type, key, val_size, random_value, populate_elements, diff --git a/tests/dragonfly/memory_test.py b/tests/dragonfly/memory_test.py index 12e65078ed3a..37cd1131812a 100644 --- a/tests/dragonfly/memory_test.py +++ b/tests/dragonfly/memory_test.py @@ -17,7 +17,7 @@ ("ZSET", 250_000, 100, 100), ("LIST", 300_000, 100, 100), ("STRING", 3_500_000, 1000, 1), - ("STREAM", 260_000, 100, 100), + ("STREAM", 280_000, 100, 100), ], ) # We limit to 5gb just in case to sanity check the gh runner. Otherwise, if we ask for too much From efc5271362522dd610902c3db03569092d52eed6 Mon Sep 17 00:00:00 2001 From: adi_holden Date: Mon, 30 Dec 2024 22:58:41 +0200 Subject: [PATCH 4/4] add stream again after rebase Signed-off-by: adi_holden --- tests/dragonfly/snapshot_test.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/tests/dragonfly/snapshot_test.py b/tests/dragonfly/snapshot_test.py index a6ae402668e6..5101388bebbf 100644 --- a/tests/dragonfly/snapshot_test.py +++ b/tests/dragonfly/snapshot_test.py @@ -569,12 +569,7 @@ async def test_tiered_entries_throttle(async_client: aioredis.Redis): @dfly_args({"serialization_max_chunk_size": 4096, "proactor_threads": 1}) @pytest.mark.parametrize( "cont_type", - [ - ("HASH"), - ("SET"), - ("ZSET"), - ("LIST"), - ], + [("HASH"), ("SET"), ("ZSET"), ("LIST"), ("STREAM")], ) @pytest.mark.slow async def test_big_value_serialization_memory_limit(df_factory, cont_type):