Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: properly track json memory usage #3641

Merged
merged 15 commits into from
Sep 18, 2024
74 changes: 52 additions & 22 deletions src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ extern "C" {

ABSL_RETIRED_FLAG(bool, use_set2, true, "If true use DenseSet for an optimized set data structure");

ABSL_FLAG(bool, experimental_flat_json, false, "If true uses flat json implementation.");

namespace dfly {
using namespace std;
using absl::GetFlag;
Expand Down Expand Up @@ -520,6 +522,12 @@ void RobjWrapper::MakeInnerRoom(size_t current_cap, size_t desired, MemoryResour

} // namespace detail

uint32_t JsonEnconding() {
static thread_local uint32_t json_enc =
absl::GetFlag(FLAGS_experimental_flat_json) ? kEncodingJsonFlat : kEncodingJsonCons;
return json_enc;
}

using namespace std;

auto CompactObj::GetStats() -> Stats {
Expand Down Expand Up @@ -575,7 +583,11 @@ size_t CompactObj::Size() const {
raw_size = u_.r_obj.Size();
break;
case JSON_TAG:
raw_size = u_.json_obj.json_len;
if (JsonEnconding() == kEncodingJsonFlat) {
raw_size = u_.json_obj.flat.json_len;
} else {
raw_size = u_.json_obj.cons.json_ptr->size();
}
break;
case SBF_TAG:
raw_size = u_.sbf->current_size();
Expand Down Expand Up @@ -680,32 +692,45 @@ std::optional<int64_t> CompactObj::TryGetInt() const {

auto CompactObj::GetJson() const -> JsonType* {
if (ObjType() == OBJ_JSON) {
DCHECK_EQ(u_.json_obj.encoding, kEncodingJsonCons);
return u_.json_obj.json_ptr;
DCHECK_EQ(JsonEnconding(), kEncodingJsonCons);
return u_.json_obj.cons.json_ptr;
}
return nullptr;
}

void CompactObj::SetJson(JsonType&& j) {
if (taglen_ == JSON_TAG && u_.json_obj.encoding == kEncodingJsonCons) {
// already json
DCHECK(u_.json_obj.json_ptr != nullptr); // must be allocated
u_.json_obj.json_len = j.size();
u_.json_obj.json_ptr->swap(j);
} else {
SetMeta(JSON_TAG);
u_.json_obj.json_len = j.size();
u_.json_obj.json_ptr = AllocateMR<JsonType>(std::move(j));
u_.json_obj.encoding = kEncodingJsonCons;
if (taglen_ == JSON_TAG && JsonEnconding() == kEncodingJsonCons) {
DCHECK(u_.json_obj.cons.json_ptr != nullptr); // must be allocated
u_.json_obj.cons.json_ptr->swap(j);
// We do not set bytes_used as this is needed. Consider the two following cases:
// 1. old json contains 50 bytes. The delta for new one is 50, so the total bytes
// the new json occupies is 100.
// 2. old json contains 100 bytes. The delta for new one is -50, so the total bytes
// the new json occupies is 50.
// Both of the cases are covered in SetJsonSize and JsonMemTracker. See below.
return;
}

SetMeta(JSON_TAG);
u_.json_obj.cons.json_ptr = AllocateMR<JsonType>(std::move(j));
u_.json_obj.cons.bytes_used = 0;
}

void CompactObj::SetJsonSize(int64_t size) {
if (taglen_ == JSON_TAG && JsonEnconding() == kEncodingJsonCons) {
// JSON.SET or if mem hasn't changed from a JSON op then we just update.
if (size < 0) {
DCHECK(static_cast<int64_t>(u_.json_obj.cons.bytes_used) >= size);
}
u_.json_obj.cons.bytes_used += size;
}
}

void CompactObj::SetJson(const uint8_t* buf, size_t len) {
SetMeta(JSON_TAG);
u_.json_obj.flat_ptr = (uint8_t*)tl.local_mr->allocate(len, kAlignSize);
memcpy(u_.json_obj.flat_ptr, buf, len);
u_.json_obj.encoding = kEncodingJsonFlat;
u_.json_obj.json_len = len;
u_.json_obj.flat.flat_ptr = (uint8_t*)tl.local_mr->allocate(len, kAlignSize);
memcpy(u_.json_obj.flat.flat_ptr, buf, len);
u_.json_obj.flat.json_len = len;
}

void CompactObj::SetSBF(uint64_t initial_capacity, double fp_prob, double grow_factor) {
Expand Down Expand Up @@ -1013,10 +1038,10 @@ void CompactObj::Free() {
u_.small_str.Free();
} else if (taglen_ == JSON_TAG) {
DVLOG(1) << "Freeing JSON object";
if (u_.json_obj.encoding == kEncodingJsonCons) {
DeleteMR<JsonType>(u_.json_obj.json_ptr);
if (JsonEnconding() == kEncodingJsonCons) {
DeleteMR<JsonType>(u_.json_obj.cons.json_ptr);
} else {
tl.local_mr->deallocate(u_.json_obj.flat_ptr, u_.json_obj.json_len, kAlignSize);
tl.local_mr->deallocate(u_.json_obj.flat.flat_ptr, u_.json_obj.flat.json_len, kAlignSize);
}
} else if (taglen_ == SBF_TAG) {
DeleteMR<SBF>(u_.sbf);
Expand All @@ -1036,8 +1061,13 @@ size_t CompactObj::MallocUsed() const {
}

if (taglen_ == JSON_TAG) {
DCHECK(u_.json_obj.json_ptr != nullptr);
return zmalloc_size(u_.json_obj.json_ptr);
// TODO fix this once we fully support flat json
// This is here because accessing a union field that is not active
// is UB.
if (JsonEnconding() == kEncodingJsonFlat) {
return 0;
}
return u_.json_obj.cons.bytes_used;
}

if (taglen_ == SMALL_TAG) {
Expand Down
23 changes: 18 additions & 5 deletions src/core/compact_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "base/pmr/memory_resource.h"
#include "core/json/json_object.h"
#include "core/mi_memory_resource.h"
#include "core/small_string.h"
#include "core/string_or_view.h"

Expand Down Expand Up @@ -101,6 +102,8 @@ using CompactObjType = unsigned;

constexpr CompactObjType kInvalidCompactObjType = std::numeric_limits<CompactObjType>::max();

uint32_t JsonEnconding();

class CompactObj {
static constexpr unsigned kInlineLen = 16;

Expand Down Expand Up @@ -309,6 +312,8 @@ class CompactObj {
// into here, no copying is allowed!
void SetJson(JsonType&& j);
void SetJson(const uint8_t* buf, size_t len);
// Adjusts the size used by json
void SetJsonSize(int64_t size);

// pre condition - the type here is OBJ_JSON and was set with SetJson
JsonType* GetJson() const;
Expand Down Expand Up @@ -445,13 +450,21 @@ class CompactObj {
};
} __attribute__((packed));

struct JsonConsT {
JsonType* json_ptr;
size_t bytes_used;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not reduce it to uint32_t and keep the encoding inside the object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uint_32 limits to 4gb. It's extreme to expect any object would take more than that but you never know. As for enconding, I don't think it makes much sense to store this. It's fixed once it's set and that's also one of the reasons I refactored it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can also use uint48:=uint8_t[6]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for storing it per object is that I expect we would want a heuristic at some point for encoding, managed per object basis. like we do with hash maps, sets etc. flat is awful for mutable operations, for example. we do not have to do it now though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to pack this in the beginning and in fact I already mentioned to @adiholden that if in the future we wanted more space on the json we store within a compcat object we could get some. However I thought that the size_t suffices for now and until we actually need this there is no reason to take the bitfield route.

Copy link
Contributor Author

@kostasrim kostasrim Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also if we ever want a heuristic, how many bits would that require? 4-8 max ? For that use case we will just make the bytes_used (and the length for flat json) a bitfield and mask the last n bits we want to use for enconding. It would be a simple change that would serve our need.

What I am trying to say here is I already thought about us needing a few more bits in the future and it's more than doable :)

};

struct FlatJsonT {
uint32_t json_len;
uint8_t* flat_ptr;
};

struct JsonWrapper {
union {
JsonType* json_ptr;
uint8_t* flat_ptr;
JsonConsT cons;
FlatJsonT flat;
};
uint32_t json_len = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not keep both fields common for both encodings?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO len is how many characters. The other one is how many bytes. I find it a little clearer 🤷

uint8_t encoding = 0;
};

// My main data structure. Union of representations.
Expand All @@ -475,7 +488,7 @@ class CompactObj {
} u_;

//
static_assert(sizeof(u_) == 16, "");
static_assert(sizeof(u_) == 16);

uint8_t mask_ = 0;

Expand Down
82 changes: 68 additions & 14 deletions src/server/json_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
#include <jsoncons_ext/jsonpointer/jsonpointer.hpp>
#include <jsoncons_ext/mergepatch/mergepatch.hpp>

#include "absl/cleanup/cleanup.h"
#include "base/flags.h"
#include "base/logging.h"
#include "core/flatbuffers.h"
#include "core/json/json_object.h"
#include "core/json/path.h"
#include "core/mi_memory_resource.h"
#include "facade/cmd_arg_parser.h"
#include "facade/op_status.h"
#include "server/acl/acl_commands_def.h"
Expand All @@ -36,7 +38,6 @@
ABSL_FLAG(bool, jsonpathv2, true,
"If true uses Dragonfly jsonpath implementation, "
"otherwise uses legacy jsoncons implementation.");
ABSL_FLAG(bool, experimental_flat_json, false, "If true uses flat json implementation.");

namespace dfly {

Expand All @@ -51,6 +52,34 @@ using CI = CommandId;

namespace {

class JsonMemTracker {
public:
JsonMemTracker() {
start_size_ = static_cast<MiMemoryResource*>(CompactObj::memory_resource())->used();
}

void SetJsonSize(PrimeValue& pv, bool is_op_set) {
const size_t current = static_cast<MiMemoryResource*>(CompactObj::memory_resource())->used();
int64_t diff = static_cast<int64_t>(current) - static_cast<int64_t>(start_size_);
// If the diff is 0 it means the object use the same memory as before. No action needed.
if (diff == 0) {
return;
}
// If op_set_ it means we JSON.SET or JSON.MSET was called. This is a blind update,
// and because the operation sets the size to 0 we also need to include the size of
// the pointer.
if (is_op_set) {
diff += static_cast<int64_t>(mi_usable_size(pv.GetJson()));
}
pv.SetJsonSize(diff);
// Under any flow we must not end up with this special value.
DCHECK(pv.MallocUsed() != 0);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth adding LOG(ERROR) for that case so if it ever happens we get an alert.

}

private:
size_t start_size_{0};
};

template <typename T> using ParseResult = io::Result<T, std::string>;

ParseResult<JsonExpression> ParseJsonPathAsExpression(std::string_view path) {
Expand Down Expand Up @@ -287,7 +316,7 @@ bool JsonAreEquals(const JsonType& lhs, const JsonType& rhs) {
}
}

facade::OpStatus SetJson(const OpArgs& op_args, string_view key, JsonType&& value) {
OpResult<DbSlice::AddOrFindResult> SetJson(const OpArgs& op_args, string_view key, JsonType value) {
auto& db_slice = op_args.GetDbSlice();

auto op_res = db_slice.AddOrFind(op_args.db_cntx, key);
Expand All @@ -297,7 +326,7 @@ facade::OpStatus SetJson(const OpArgs& op_args, string_view key, JsonType&& valu

op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, res.it->second);

if (absl::GetFlag(FLAGS_experimental_flat_json)) {
if (JsonEnconding() == kEncodingJsonFlat) {
flexbuffers::Builder fbb;
json::FromJsonType(value, &fbb);
fbb.Finish();
Expand All @@ -307,7 +336,7 @@ facade::OpStatus SetJson(const OpArgs& op_args, string_view key, JsonType&& valu
res.it->second.SetJson(std::move(value));
}
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, res.it->second);
return OpStatus::OK;
return std::move(res);
}

string JsonTypeToName(const JsonType& val) {
Expand Down Expand Up @@ -572,6 +601,8 @@ OpResult<JsonCallbackResult<optional<T>>> JsonMutateOperation(const OpArgs& op_a
auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_JSON);
RETURN_ON_BAD_STATUS(it_res);

JsonMemTracker mem_tracker;

PrimeValue& pv = it_res->it->second;

JsonType* json_val = pv.GetJson();
Expand All @@ -587,6 +618,8 @@ OpResult<JsonCallbackResult<optional<T>>> JsonMutateOperation(const OpArgs& op_a
options.verify_op(*json_val);
}

// we need to manually run this before the PostUpdater run
mem_tracker.SetJsonSize(pv, false);
it_res->post_updater.Run();
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv);

Expand Down Expand Up @@ -856,15 +889,23 @@ OpResult<long> OpDel(const OpArgs& op_args, string_view key, string_view path,
auto it = db_slice.FindMutable(op_args.db_cntx, key).it; // post_updater will run immediately
return static_cast<long>(db_slice.Del(op_args.db_cntx, it));
}
OpResult<JsonType*> result = GetJson(op_args, key);
if (!result) {
JsonMemTracker tracker;
// FindMutable because we need to run the AutoUpdater at the end which will account
// the deltas calculated from the MemoryTracker
auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_JSON);
if (!it_res) {
return 0;
}

PrimeValue& pv = it_res->it->second;
JsonType* json_val = pv.GetJson();

absl::Cleanup update_size_on_exit([tracker, &pv]() mutable { tracker.SetJsonSize(pv, false); });

if (json_path.HoldsJsonPath()) {
const json::Path& path = json_path.AsJsonPath();
long deletions = json::MutatePath(
path, [](optional<string_view>, JsonType* val) { return true; }, *result);
path, [](optional<string_view>, JsonType* val) { return true; }, json_val);
return deletions;
}

Expand All @@ -874,7 +915,7 @@ OpResult<long> OpDel(const OpArgs& op_args, string_view key, string_view path,
return {};
};

auto res = json_path.Mutate<Nothing>(result.value(), std::move(cb));
auto res = json_path.Mutate<Nothing>(json_val, std::move(cb));
RETURN_ON_BAD_STATUS(res);

if (deletion_items.empty()) {
Expand All @@ -892,7 +933,7 @@ OpResult<long> OpDel(const OpArgs& op_args, string_view key, string_view path,
}

std::error_code ec;
jsoncons::jsonpatch::apply_patch(*result.value(), patch, ec);
jsoncons::jsonpatch::apply_patch(*json_val, patch, ec);
if (ec) {
VLOG(1) << "Failed to apply patch on json with error: " << ec.message();
return 0;
Expand Down Expand Up @@ -1262,10 +1303,18 @@ OpResult<bool> OpSet(const OpArgs& op_args, string_view key, string_view path,
}
}

OpStatus st = SetJson(op_args, key, std::move(parsed_json.value()));
if (st != OpStatus::OK) {
return st;
JsonMemTracker mem_tracker;
// We need to deep copy parsed_json.value() and not use move! The reason is that otherwise
// it's really difficult to properly track memory deltas because even if we move below,
// the deallocation of parsed_json won't happen in the scope of SetJson but in the scope
// of this function. Because of this, the memory tracking will be off. Another solution here,
// is to use absl::Cleanup and dispatch another Find() but that's too complicated because then
// you need to take into account the order of destructors.
OpResult<DbSlice::AddOrFindResult> st = SetJson(op_args, key, parsed_json.value());
if (st.status() != OpStatus::OK) {
return st.status();
}
mem_tracker.SetJsonSize(st->it->second, st->is_new);
return true;
}

Expand All @@ -1281,6 +1330,9 @@ OpResult<bool> OpSet(const OpArgs& op_args, string_view key, string_view path,
path_exists = true;
if (!is_nx_condition) {
operation_result = true;
static_assert(
std::is_same_v<std::allocator_traits<JsonType>::propagate_on_container_copy_assignment,
std::false_type>);
*val = new_json;
}
return {};
Expand Down Expand Up @@ -1310,9 +1362,13 @@ OpResult<bool> OpSet(const OpArgs& op_args, string_view key, string_view path,
return OpStatus::OK;
};

// JsonMutateOperation uses it's own JsonMemTracker. It will work, because updates to already
// existing json keys use copy assign, so we don't really need to account for the memory
// allocated by ShardJsonFromString above since it's not being moved here at all.
auto res = JsonMutateOperation<Nothing>(op_args, key, json_path, std::move(cb),
MutateOperationOptions{std::move(inserter), {}});
RETURN_ON_BAD_STATUS(res);

return operation_result;
}

Expand Down Expand Up @@ -1364,8 +1420,6 @@ OpStatus OpMSet(const OpArgs& op_args, const ShardArgs& args) {
// implemented yet.
OpStatus OpMerge(const OpArgs& op_args, string_view key, string_view path,
const WrappedJsonPath& json_path, std::string_view json_str) {
// DCHECK(!json_path.HoldsJsonPath());

std::optional<JsonType> parsed_json = ShardJsonFromString(json_str);
if (!parsed_json) {
VLOG(1) << "got invalid JSON string '" << json_str << "' cannot be saved";
Expand Down
Loading
Loading