-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: properly track json memory usage #3641
Changes from all commits
3c5f6e6
f664fc3
b601171
fbb1af3
ef16cd8
15022ff
ba4a9fb
4e4c2b0
8852049
4807d9e
690455c
43553c7
c07ea48
56282f9
6371388
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ | |
|
||
#include "base/pmr/memory_resource.h" | ||
#include "core/json/json_object.h" | ||
#include "core/mi_memory_resource.h" | ||
#include "core/small_string.h" | ||
#include "core/string_or_view.h" | ||
|
||
|
@@ -101,6 +102,8 @@ using CompactObjType = unsigned; | |
|
||
constexpr CompactObjType kInvalidCompactObjType = std::numeric_limits<CompactObjType>::max(); | ||
|
||
uint32_t JsonEnconding(); | ||
|
||
class CompactObj { | ||
static constexpr unsigned kInlineLen = 16; | ||
|
||
|
@@ -309,6 +312,8 @@ class CompactObj { | |
// into here, no copying is allowed! | ||
void SetJson(JsonType&& j); | ||
void SetJson(const uint8_t* buf, size_t len); | ||
// Adjusts the size used by json | ||
void SetJsonSize(int64_t size); | ||
|
||
// pre condition - the type here is OBJ_JSON and was set with SetJson | ||
JsonType* GetJson() const; | ||
|
@@ -445,13 +450,21 @@ class CompactObj { | |
}; | ||
} __attribute__((packed)); | ||
|
||
struct JsonConsT { | ||
JsonType* json_ptr; | ||
size_t bytes_used; | ||
}; | ||
|
||
struct FlatJsonT { | ||
uint32_t json_len; | ||
uint8_t* flat_ptr; | ||
}; | ||
|
||
struct JsonWrapper { | ||
union { | ||
JsonType* json_ptr; | ||
uint8_t* flat_ptr; | ||
JsonConsT cons; | ||
FlatJsonT flat; | ||
}; | ||
uint32_t json_len = 0; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not keep both fields common for both encodings? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO len is how many characters. The other one is how many bytes. I find it a little clearer 🤷 |
||
uint8_t encoding = 0; | ||
}; | ||
|
||
// My main data structure. Union of representations. | ||
|
@@ -475,7 +488,7 @@ class CompactObj { | |
} u_; | ||
|
||
// | ||
static_assert(sizeof(u_) == 16, ""); | ||
static_assert(sizeof(u_) == 16); | ||
|
||
uint8_t mask_ = 0; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,11 +15,13 @@ | |
#include <jsoncons_ext/jsonpointer/jsonpointer.hpp> | ||
#include <jsoncons_ext/mergepatch/mergepatch.hpp> | ||
|
||
#include "absl/cleanup/cleanup.h" | ||
#include "base/flags.h" | ||
#include "base/logging.h" | ||
#include "core/flatbuffers.h" | ||
#include "core/json/json_object.h" | ||
#include "core/json/path.h" | ||
#include "core/mi_memory_resource.h" | ||
#include "facade/cmd_arg_parser.h" | ||
#include "facade/op_status.h" | ||
#include "server/acl/acl_commands_def.h" | ||
|
@@ -36,7 +38,6 @@ | |
ABSL_FLAG(bool, jsonpathv2, true, | ||
"If true uses Dragonfly jsonpath implementation, " | ||
"otherwise uses legacy jsoncons implementation."); | ||
ABSL_FLAG(bool, experimental_flat_json, false, "If true uses flat json implementation."); | ||
|
||
namespace dfly { | ||
|
||
|
@@ -51,6 +52,34 @@ using CI = CommandId; | |
|
||
namespace { | ||
|
||
class JsonMemTracker { | ||
public: | ||
JsonMemTracker() { | ||
start_size_ = static_cast<MiMemoryResource*>(CompactObj::memory_resource())->used(); | ||
} | ||
|
||
void SetJsonSize(PrimeValue& pv, bool is_op_set) { | ||
const size_t current = static_cast<MiMemoryResource*>(CompactObj::memory_resource())->used(); | ||
int64_t diff = static_cast<int64_t>(current) - static_cast<int64_t>(start_size_); | ||
// If the diff is 0 it means the object use the same memory as before. No action needed. | ||
if (diff == 0) { | ||
return; | ||
} | ||
// If op_set_ it means we JSON.SET or JSON.MSET was called. This is a blind update, | ||
// and because the operation sets the size to 0 we also need to include the size of | ||
// the pointer. | ||
if (is_op_set) { | ||
diff += static_cast<int64_t>(mi_usable_size(pv.GetJson())); | ||
} | ||
pv.SetJsonSize(diff); | ||
// Under any flow we must not end up with this special value. | ||
DCHECK(pv.MallocUsed() != 0); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe worth adding |
||
} | ||
|
||
private: | ||
size_t start_size_{0}; | ||
}; | ||
|
||
template <typename T> using ParseResult = io::Result<T, std::string>; | ||
|
||
ParseResult<JsonExpression> ParseJsonPathAsExpression(std::string_view path) { | ||
|
@@ -287,7 +316,7 @@ bool JsonAreEquals(const JsonType& lhs, const JsonType& rhs) { | |
} | ||
} | ||
|
||
facade::OpStatus SetJson(const OpArgs& op_args, string_view key, JsonType&& value) { | ||
OpResult<DbSlice::AddOrFindResult> SetJson(const OpArgs& op_args, string_view key, JsonType value) { | ||
auto& db_slice = op_args.GetDbSlice(); | ||
|
||
auto op_res = db_slice.AddOrFind(op_args.db_cntx, key); | ||
|
@@ -297,7 +326,7 @@ facade::OpStatus SetJson(const OpArgs& op_args, string_view key, JsonType&& valu | |
|
||
op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, res.it->second); | ||
|
||
if (absl::GetFlag(FLAGS_experimental_flat_json)) { | ||
if (JsonEnconding() == kEncodingJsonFlat) { | ||
flexbuffers::Builder fbb; | ||
json::FromJsonType(value, &fbb); | ||
fbb.Finish(); | ||
|
@@ -307,7 +336,7 @@ facade::OpStatus SetJson(const OpArgs& op_args, string_view key, JsonType&& valu | |
res.it->second.SetJson(std::move(value)); | ||
} | ||
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, res.it->second); | ||
return OpStatus::OK; | ||
return std::move(res); | ||
} | ||
|
||
string JsonTypeToName(const JsonType& val) { | ||
|
@@ -572,6 +601,8 @@ OpResult<JsonCallbackResult<optional<T>>> JsonMutateOperation(const OpArgs& op_a | |
auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_JSON); | ||
RETURN_ON_BAD_STATUS(it_res); | ||
|
||
JsonMemTracker mem_tracker; | ||
|
||
PrimeValue& pv = it_res->it->second; | ||
|
||
JsonType* json_val = pv.GetJson(); | ||
|
@@ -587,6 +618,8 @@ OpResult<JsonCallbackResult<optional<T>>> JsonMutateOperation(const OpArgs& op_a | |
options.verify_op(*json_val); | ||
} | ||
|
||
// we need to manually run this before the PostUpdater run | ||
mem_tracker.SetJsonSize(pv, false); | ||
it_res->post_updater.Run(); | ||
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv); | ||
|
||
|
@@ -856,15 +889,23 @@ OpResult<long> OpDel(const OpArgs& op_args, string_view key, string_view path, | |
auto it = db_slice.FindMutable(op_args.db_cntx, key).it; // post_updater will run immediately | ||
return static_cast<long>(db_slice.Del(op_args.db_cntx, it)); | ||
} | ||
OpResult<JsonType*> result = GetJson(op_args, key); | ||
if (!result) { | ||
JsonMemTracker tracker; | ||
// FindMutable because we need to run the AutoUpdater at the end which will account | ||
// the deltas calculated from the MemoryTracker | ||
auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_JSON); | ||
if (!it_res) { | ||
return 0; | ||
} | ||
|
||
PrimeValue& pv = it_res->it->second; | ||
JsonType* json_val = pv.GetJson(); | ||
|
||
absl::Cleanup update_size_on_exit([tracker, &pv]() mutable { tracker.SetJsonSize(pv, false); }); | ||
|
||
if (json_path.HoldsJsonPath()) { | ||
const json::Path& path = json_path.AsJsonPath(); | ||
long deletions = json::MutatePath( | ||
path, [](optional<string_view>, JsonType* val) { return true; }, *result); | ||
path, [](optional<string_view>, JsonType* val) { return true; }, json_val); | ||
return deletions; | ||
} | ||
|
||
|
@@ -874,7 +915,7 @@ OpResult<long> OpDel(const OpArgs& op_args, string_view key, string_view path, | |
return {}; | ||
}; | ||
|
||
auto res = json_path.Mutate<Nothing>(result.value(), std::move(cb)); | ||
auto res = json_path.Mutate<Nothing>(json_val, std::move(cb)); | ||
RETURN_ON_BAD_STATUS(res); | ||
|
||
if (deletion_items.empty()) { | ||
|
@@ -892,7 +933,7 @@ OpResult<long> OpDel(const OpArgs& op_args, string_view key, string_view path, | |
} | ||
|
||
std::error_code ec; | ||
jsoncons::jsonpatch::apply_patch(*result.value(), patch, ec); | ||
jsoncons::jsonpatch::apply_patch(*json_val, patch, ec); | ||
if (ec) { | ||
VLOG(1) << "Failed to apply patch on json with error: " << ec.message(); | ||
return 0; | ||
|
@@ -1262,10 +1303,18 @@ OpResult<bool> OpSet(const OpArgs& op_args, string_view key, string_view path, | |
} | ||
} | ||
|
||
OpStatus st = SetJson(op_args, key, std::move(parsed_json.value())); | ||
if (st != OpStatus::OK) { | ||
return st; | ||
JsonMemTracker mem_tracker; | ||
// We need to deep copy parsed_json.value() and not use move! The reason is that otherwise | ||
// it's really difficult to properly track memory deltas because even if we move below, | ||
// the deallocation of parsed_json won't happen in the scope of SetJson but in the scope | ||
// of this function. Because of this, the memory tracking will be off. Another solution here, | ||
// is to use absl::Cleanup and dispatch another Find() but that's too complicated because then | ||
// you need to take into account the order of destructors. | ||
OpResult<DbSlice::AddOrFindResult> st = SetJson(op_args, key, parsed_json.value()); | ||
if (st.status() != OpStatus::OK) { | ||
return st.status(); | ||
} | ||
mem_tracker.SetJsonSize(st->it->second, st->is_new); | ||
return true; | ||
} | ||
|
||
|
@@ -1281,6 +1330,9 @@ OpResult<bool> OpSet(const OpArgs& op_args, string_view key, string_view path, | |
path_exists = true; | ||
if (!is_nx_condition) { | ||
operation_result = true; | ||
static_assert( | ||
std::is_same_v<std::allocator_traits<JsonType>::propagate_on_container_copy_assignment, | ||
std::false_type>); | ||
*val = new_json; | ||
} | ||
return {}; | ||
|
@@ -1310,9 +1362,13 @@ OpResult<bool> OpSet(const OpArgs& op_args, string_view key, string_view path, | |
return OpStatus::OK; | ||
}; | ||
|
||
// JsonMutateOperation uses it's own JsonMemTracker. It will work, because updates to already | ||
// existing json keys use copy assign, so we don't really need to account for the memory | ||
// allocated by ShardJsonFromString above since it's not being moved here at all. | ||
auto res = JsonMutateOperation<Nothing>(op_args, key, json_path, std::move(cb), | ||
MutateOperationOptions{std::move(inserter), {}}); | ||
RETURN_ON_BAD_STATUS(res); | ||
|
||
return operation_result; | ||
} | ||
|
||
|
@@ -1364,8 +1420,6 @@ OpStatus OpMSet(const OpArgs& op_args, const ShardArgs& args) { | |
// implemented yet. | ||
OpStatus OpMerge(const OpArgs& op_args, string_view key, string_view path, | ||
const WrappedJsonPath& json_path, std::string_view json_str) { | ||
// DCHECK(!json_path.HoldsJsonPath()); | ||
|
||
std::optional<JsonType> parsed_json = ShardJsonFromString(json_str); | ||
if (!parsed_json) { | ||
VLOG(1) << "got invalid JSON string '" << json_str << "' cannot be saved"; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not reduce it to uint32_t and keep the encoding inside the object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uint_32 limits to 4gb. It's extreme to expect any object would take more than that but you never know. As for enconding, I don't think it makes much sense to store this. It's fixed once it's set and that's also one of the reasons I refactored it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can also use uint48:=uint8_t[6]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason for storing it per object is that I expect we would want a heuristic at some point for encoding, managed per object basis. like we do with hash maps, sets etc. flat is awful for mutable operations, for example. we do not have to do it now though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to pack this in the beginning and in fact I already mentioned to @adiholden that if in the future we wanted more space on the json we store within a compcat object we could get some. However I thought that the size_t suffices for now and until we actually need this there is no reason to take the bitfield route.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also if we ever want a heuristic, how many bits would that require? 4-8 max ? For that use case we will just make the bytes_used (and the length for flat json) a bitfield and mask the last
n
bits we want to use for enconding. It would be a simple change that would serve our need.What I am trying to say here is I already thought about us needing a few more bits in the future and it's more than doable :)