Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(server): debug populate consume less memory #4384

Merged
merged 4 commits into from
Jan 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 28 additions & 20 deletions src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,13 @@ tuple<const CommandId*, absl::InlinedVector<string, 5>> GeneratePopulateCommand(
args.push_back(GenerateValue(val_size, random_value, gen));
}
} else if (type == "JSON") {
cid = registry.Find("JSON.SET");
cid = registry.Find("JSON.MERGE");
args.push_back("$");

string json = "{";
for (size_t i = 0; i < elements; ++i) {
absl::StrAppend(&json, "\"", i, "\":\"", GenerateValue(val_size, random_value, gen), "\",");
absl::StrAppend(&json, "\"", GenerateValue(val_size / 2, random_value, gen), "\":\"",
GenerateValue(val_size / 2, random_value, gen), "\",");
}
json[json.size() - 1] = '}'; // Replace last ',' with '}'
args.push_back(json);
Expand Down Expand Up @@ -157,30 +158,37 @@ void DoPopulateBatch(string_view type, string_view prefix, size_t val_size, bool
absl::InlinedVector<string_view, 5> args_view;
facade::CapturingReplyBuilder crb;
ConnectionContext local_cntx{cntx, stub_tx.get()};

absl::InsecureBitGen gen;
for (unsigned i = 0; i < batch.sz; ++i) {
string key = absl::StrCat(prefix, ":", batch.index[i]);
uint32_t elements_left = elements;

while (elements_left) {
// limit rss grow by 32K by limiting the element count in each command.
uint32_t max_batch_elements = std::max(32_KB / val_size, 1ULL);
uint32_t populate_elements = std::min(max_batch_elements, elements_left);
elements_left -= populate_elements;
auto [cid, args] =
GeneratePopulateCommand(type, key, val_size, random_value, populate_elements,
*sf->service().mutable_registry(), &gen);
if (!cid) {
LOG_EVERY_N(WARNING, 10'000) << "Unable to find command, was it renamed?";
break;
}

auto [cid, args] = GeneratePopulateCommand(type, std::move(key), val_size, random_value,
elements, *sf->service().mutable_registry(), &gen);
if (!cid) {
LOG_EVERY_N(WARNING, 10'000) << "Unable to find command, was it renamed?";
break;
}

args_view.clear();
for (auto& arg : args) {
args_view.push_back(arg);
}
auto args_span = absl::MakeSpan(args_view);
args_view.clear();
for (auto& arg : args) {
args_view.push_back(arg);
}
auto args_span = absl::MakeSpan(args_view);

stub_tx->MultiSwitchCmd(cid);
local_cntx.cid = cid;
crb.SetReplyMode(ReplyMode::NONE);
stub_tx->InitByArgs(cntx->ns, local_cntx.conn_state.db_index, args_span);
stub_tx->MultiSwitchCmd(cid);
local_cntx.cid = cid;
crb.SetReplyMode(ReplyMode::NONE);
stub_tx->InitByArgs(cntx->ns, local_cntx.conn_state.db_index, args_span);

sf->service().InvokeCmd(cid, args_span, &crb, &local_cntx);
sf->service().InvokeCmd(cid, args_span, &crb, &local_cntx);
}
}

local_tx->UnlockMulti();
Expand Down
3 changes: 2 additions & 1 deletion tests/dragonfly/memory_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
("ZSET", 250_000, 100, 100),
("LIST", 300_000, 100, 100),
("STRING", 3_500_000, 1000, 1),
("STREAM", 260_000, 100, 100),
("STREAM", 280_000, 100, 100),
],
)
# We limit to 5gb just in case to sanity check the gh runner. Otherwise, if we ask for too much
Expand Down Expand Up @@ -69,6 +69,7 @@ async def check_memory():
await client.execute_command("DFLY", "LOAD", f"{dbfilename}-summary.dfs")

await check_memory()
await client.execute_command("FLUSHALL")


@pytest.mark.asyncio
Expand Down
14 changes: 4 additions & 10 deletions tests/dragonfly/snapshot_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,12 +569,7 @@ async def test_tiered_entries_throttle(async_client: aioredis.Redis):
@dfly_args({"serialization_max_chunk_size": 4096, "proactor_threads": 1})
@pytest.mark.parametrize(
"cont_type",
[
("HASH"),
("SET"),
("ZSET"),
("LIST"),
],
[("HASH"), ("SET"), ("ZSET"), ("LIST"), ("STREAM")],
)
@pytest.mark.slow
async def test_big_value_serialization_memory_limit(df_factory, cont_type):
Expand All @@ -590,17 +585,16 @@ async def test_big_value_serialization_memory_limit(df_factory, cont_type):
await client.execute_command(
f"debug populate 1 prefix {element_size} TYPE {cont_type} RAND ELEMENTS {elements}"
)
await asyncio.sleep(1)

info = await client.info("ALL")
# rss double's because of DEBUG POPULATE
assert info["used_memory_peak_rss"] > (one_gb * 2)
assert info["used_memory_peak_rss"] < (one_gb * 1.2)
# if we execute SAVE below without big value serialization we trigger the assertion below.
# note the peak would reach (one_gb * 3) without it.
await client.execute_command("SAVE")
info = await client.info("ALL")

upper_limit = 2_250_000_000 # 2.25 GB
assert info["used_memory_peak_rss"] < upper_limit
assert info["used_memory_peak_rss"] < (one_gb * 1.3)

await client.execute_command("FLUSHALL")
await client.close()
Expand Down
Loading