diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 326fd569ee76..3c72e57514d4 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -124,12 +124,13 @@ tuple> GeneratePopulateCommand( args.push_back(GenerateValue(val_size, random_value, gen)); } } else if (type == "JSON") { - cid = registry.Find("JSON.SET"); + cid = registry.Find("JSON.MERGE"); args.push_back("$"); string json = "{"; for (size_t i = 0; i < elements; ++i) { - absl::StrAppend(&json, "\"", i, "\":\"", GenerateValue(val_size, random_value, gen), "\","); + absl::StrAppend(&json, "\"", GenerateValue(val_size / 2, random_value, gen), "\":\"", + GenerateValue(val_size / 2, random_value, gen), "\","); } json[json.size() - 1] = '}'; // Replace last ',' with '}' args.push_back(json); @@ -157,30 +158,37 @@ void DoPopulateBatch(string_view type, string_view prefix, size_t val_size, bool absl::InlinedVector args_view; facade::CapturingReplyBuilder crb; ConnectionContext local_cntx{cntx, stub_tx.get()}; - absl::InsecureBitGen gen; for (unsigned i = 0; i < batch.sz; ++i) { string key = absl::StrCat(prefix, ":", batch.index[i]); + uint32_t elements_left = elements; + + while (elements_left) { + // limit rss grow by 32K by limiting the element count in each command. + uint32_t max_batch_elements = std::max(32_KB / val_size, 1ULL); + uint32_t populate_elements = std::min(max_batch_elements, elements_left); + elements_left -= populate_elements; + auto [cid, args] = + GeneratePopulateCommand(type, key, val_size, random_value, populate_elements, + *sf->service().mutable_registry(), &gen); + if (!cid) { + LOG_EVERY_N(WARNING, 10'000) << "Unable to find command, was it renamed?"; + break; + } - auto [cid, args] = GeneratePopulateCommand(type, std::move(key), val_size, random_value, - elements, *sf->service().mutable_registry(), &gen); - if (!cid) { - LOG_EVERY_N(WARNING, 10'000) << "Unable to find command, was it renamed?"; - break; - } - - args_view.clear(); - for (auto& arg : args) { - args_view.push_back(arg); - } - auto args_span = absl::MakeSpan(args_view); + args_view.clear(); + for (auto& arg : args) { + args_view.push_back(arg); + } + auto args_span = absl::MakeSpan(args_view); - stub_tx->MultiSwitchCmd(cid); - local_cntx.cid = cid; - crb.SetReplyMode(ReplyMode::NONE); - stub_tx->InitByArgs(cntx->ns, local_cntx.conn_state.db_index, args_span); + stub_tx->MultiSwitchCmd(cid); + local_cntx.cid = cid; + crb.SetReplyMode(ReplyMode::NONE); + stub_tx->InitByArgs(cntx->ns, local_cntx.conn_state.db_index, args_span); - sf->service().InvokeCmd(cid, args_span, &crb, &local_cntx); + sf->service().InvokeCmd(cid, args_span, &crb, &local_cntx); + } } local_tx->UnlockMulti(); diff --git a/tests/dragonfly/memory_test.py b/tests/dragonfly/memory_test.py index 292275a2002c..37cd1131812a 100644 --- a/tests/dragonfly/memory_test.py +++ b/tests/dragonfly/memory_test.py @@ -17,7 +17,7 @@ ("ZSET", 250_000, 100, 100), ("LIST", 300_000, 100, 100), ("STRING", 3_500_000, 1000, 1), - ("STREAM", 260_000, 100, 100), + ("STREAM", 280_000, 100, 100), ], ) # We limit to 5gb just in case to sanity check the gh runner. Otherwise, if we ask for too much @@ -69,6 +69,7 @@ async def check_memory(): await client.execute_command("DFLY", "LOAD", f"{dbfilename}-summary.dfs") await check_memory() + await client.execute_command("FLUSHALL") @pytest.mark.asyncio diff --git a/tests/dragonfly/snapshot_test.py b/tests/dragonfly/snapshot_test.py index 81fd7f7fc00a..5101388bebbf 100644 --- a/tests/dragonfly/snapshot_test.py +++ b/tests/dragonfly/snapshot_test.py @@ -569,12 +569,7 @@ async def test_tiered_entries_throttle(async_client: aioredis.Redis): @dfly_args({"serialization_max_chunk_size": 4096, "proactor_threads": 1}) @pytest.mark.parametrize( "cont_type", - [ - ("HASH"), - ("SET"), - ("ZSET"), - ("LIST"), - ], + [("HASH"), ("SET"), ("ZSET"), ("LIST"), ("STREAM")], ) @pytest.mark.slow async def test_big_value_serialization_memory_limit(df_factory, cont_type): @@ -590,17 +585,16 @@ async def test_big_value_serialization_memory_limit(df_factory, cont_type): await client.execute_command( f"debug populate 1 prefix {element_size} TYPE {cont_type} RAND ELEMENTS {elements}" ) + await asyncio.sleep(1) info = await client.info("ALL") - # rss double's because of DEBUG POPULATE - assert info["used_memory_peak_rss"] > (one_gb * 2) + assert info["used_memory_peak_rss"] < (one_gb * 1.2) # if we execute SAVE below without big value serialization we trigger the assertion below. # note the peak would reach (one_gb * 3) without it. await client.execute_command("SAVE") info = await client.info("ALL") - upper_limit = 2_250_000_000 # 2.25 GB - assert info["used_memory_peak_rss"] < upper_limit + assert info["used_memory_peak_rss"] < (one_gb * 1.3) await client.execute_command("FLUSHALL") await client.close()