diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index 6d7db4ae1895..2b84f9c8ef70 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -33,6 +33,8 @@ extern "C" { ABSL_RETIRED_FLAG(bool, use_set2, true, "If true use DenseSet for an optimized set data structure"); +ABSL_FLAG(bool, experimental_flat_json, false, "If true uses flat json implementation."); + namespace dfly { using namespace std; using absl::GetFlag; @@ -520,6 +522,12 @@ void RobjWrapper::MakeInnerRoom(size_t current_cap, size_t desired, MemoryResour } // namespace detail +uint32_t JsonEnconding() { + static thread_local uint32_t json_enc = + absl::GetFlag(FLAGS_experimental_flat_json) ? kEncodingJsonFlat : kEncodingJsonCons; + return json_enc; +} + using namespace std; auto CompactObj::GetStats() -> Stats { @@ -575,7 +583,11 @@ size_t CompactObj::Size() const { raw_size = u_.r_obj.Size(); break; case JSON_TAG: - raw_size = u_.json_obj.json_len; + if (JsonEnconding() == kEncodingJsonFlat) { + raw_size = u_.json_obj.flat.json_len; + } else { + raw_size = u_.json_obj.cons.json_ptr->size(); + } break; case SBF_TAG: raw_size = u_.sbf->current_size(); @@ -680,32 +692,45 @@ std::optional CompactObj::TryGetInt() const { auto CompactObj::GetJson() const -> JsonType* { if (ObjType() == OBJ_JSON) { - DCHECK_EQ(u_.json_obj.encoding, kEncodingJsonCons); - return u_.json_obj.json_ptr; + DCHECK_EQ(JsonEnconding(), kEncodingJsonCons); + return u_.json_obj.cons.json_ptr; } return nullptr; } void CompactObj::SetJson(JsonType&& j) { - if (taglen_ == JSON_TAG && u_.json_obj.encoding == kEncodingJsonCons) { - // already json - DCHECK(u_.json_obj.json_ptr != nullptr); // must be allocated - u_.json_obj.json_len = j.size(); - u_.json_obj.json_ptr->swap(j); - } else { - SetMeta(JSON_TAG); - u_.json_obj.json_len = j.size(); - u_.json_obj.json_ptr = AllocateMR(std::move(j)); - u_.json_obj.encoding = kEncodingJsonCons; + if (taglen_ == JSON_TAG && JsonEnconding() == kEncodingJsonCons) { + DCHECK(u_.json_obj.cons.json_ptr != nullptr); // must be allocated + u_.json_obj.cons.json_ptr->swap(j); + // We do not set bytes_used as this is needed. Consider the two following cases: + // 1. old json contains 50 bytes. The delta for new one is 50, so the total bytes + // the new json occupies is 100. + // 2. old json contains 100 bytes. The delta for new one is -50, so the total bytes + // the new json occupies is 50. + // Both of the cases are covered in SetJsonSize and JsonMemTracker. See below. + return; + } + + SetMeta(JSON_TAG); + u_.json_obj.cons.json_ptr = AllocateMR(std::move(j)); + u_.json_obj.cons.bytes_used = 0; +} + +void CompactObj::SetJsonSize(int64_t size) { + if (taglen_ == JSON_TAG && JsonEnconding() == kEncodingJsonCons) { + // JSON.SET or if mem hasn't changed from a JSON op then we just update. + if (size < 0) { + DCHECK(static_cast(u_.json_obj.cons.bytes_used) >= size); + } + u_.json_obj.cons.bytes_used += size; } } void CompactObj::SetJson(const uint8_t* buf, size_t len) { SetMeta(JSON_TAG); - u_.json_obj.flat_ptr = (uint8_t*)tl.local_mr->allocate(len, kAlignSize); - memcpy(u_.json_obj.flat_ptr, buf, len); - u_.json_obj.encoding = kEncodingJsonFlat; - u_.json_obj.json_len = len; + u_.json_obj.flat.flat_ptr = (uint8_t*)tl.local_mr->allocate(len, kAlignSize); + memcpy(u_.json_obj.flat.flat_ptr, buf, len); + u_.json_obj.flat.json_len = len; } void CompactObj::SetSBF(uint64_t initial_capacity, double fp_prob, double grow_factor) { @@ -1013,10 +1038,10 @@ void CompactObj::Free() { u_.small_str.Free(); } else if (taglen_ == JSON_TAG) { DVLOG(1) << "Freeing JSON object"; - if (u_.json_obj.encoding == kEncodingJsonCons) { - DeleteMR(u_.json_obj.json_ptr); + if (JsonEnconding() == kEncodingJsonCons) { + DeleteMR(u_.json_obj.cons.json_ptr); } else { - tl.local_mr->deallocate(u_.json_obj.flat_ptr, u_.json_obj.json_len, kAlignSize); + tl.local_mr->deallocate(u_.json_obj.flat.flat_ptr, u_.json_obj.flat.json_len, kAlignSize); } } else if (taglen_ == SBF_TAG) { DeleteMR(u_.sbf); @@ -1036,8 +1061,13 @@ size_t CompactObj::MallocUsed() const { } if (taglen_ == JSON_TAG) { - DCHECK(u_.json_obj.json_ptr != nullptr); - return zmalloc_size(u_.json_obj.json_ptr); + // TODO fix this once we fully support flat json + // This is here because accessing a union field that is not active + // is UB. + if (JsonEnconding() == kEncodingJsonFlat) { + return 0; + } + return u_.json_obj.cons.bytes_used; } if (taglen_ == SMALL_TAG) { diff --git a/src/core/compact_object.h b/src/core/compact_object.h index fb50ae6ca1c5..7e878a6c92fd 100644 --- a/src/core/compact_object.h +++ b/src/core/compact_object.h @@ -12,6 +12,7 @@ #include "base/pmr/memory_resource.h" #include "core/json/json_object.h" +#include "core/mi_memory_resource.h" #include "core/small_string.h" #include "core/string_or_view.h" @@ -101,6 +102,8 @@ using CompactObjType = unsigned; constexpr CompactObjType kInvalidCompactObjType = std::numeric_limits::max(); +uint32_t JsonEnconding(); + class CompactObj { static constexpr unsigned kInlineLen = 16; @@ -309,6 +312,8 @@ class CompactObj { // into here, no copying is allowed! void SetJson(JsonType&& j); void SetJson(const uint8_t* buf, size_t len); + // Adjusts the size used by json + void SetJsonSize(int64_t size); // pre condition - the type here is OBJ_JSON and was set with SetJson JsonType* GetJson() const; @@ -445,13 +450,21 @@ class CompactObj { }; } __attribute__((packed)); + struct JsonConsT { + JsonType* json_ptr; + size_t bytes_used; + }; + + struct FlatJsonT { + uint32_t json_len; + uint8_t* flat_ptr; + }; + struct JsonWrapper { union { - JsonType* json_ptr; - uint8_t* flat_ptr; + JsonConsT cons; + FlatJsonT flat; }; - uint32_t json_len = 0; - uint8_t encoding = 0; }; // My main data structure. Union of representations. @@ -475,7 +488,7 @@ class CompactObj { } u_; // - static_assert(sizeof(u_) == 16, ""); + static_assert(sizeof(u_) == 16); uint8_t mask_ = 0; diff --git a/src/server/json_family.cc b/src/server/json_family.cc index 4816a3d9224e..1bce46ebed2a 100644 --- a/src/server/json_family.cc +++ b/src/server/json_family.cc @@ -15,11 +15,13 @@ #include #include +#include "absl/cleanup/cleanup.h" #include "base/flags.h" #include "base/logging.h" #include "core/flatbuffers.h" #include "core/json/json_object.h" #include "core/json/path.h" +#include "core/mi_memory_resource.h" #include "facade/cmd_arg_parser.h" #include "facade/op_status.h" #include "server/acl/acl_commands_def.h" @@ -36,7 +38,6 @@ ABSL_FLAG(bool, jsonpathv2, true, "If true uses Dragonfly jsonpath implementation, " "otherwise uses legacy jsoncons implementation."); -ABSL_FLAG(bool, experimental_flat_json, false, "If true uses flat json implementation."); namespace dfly { @@ -51,6 +52,34 @@ using CI = CommandId; namespace { +class JsonMemTracker { + public: + JsonMemTracker() { + start_size_ = static_cast(CompactObj::memory_resource())->used(); + } + + void SetJsonSize(PrimeValue& pv, bool is_op_set) { + const size_t current = static_cast(CompactObj::memory_resource())->used(); + int64_t diff = static_cast(current) - static_cast(start_size_); + // If the diff is 0 it means the object use the same memory as before. No action needed. + if (diff == 0) { + return; + } + // If op_set_ it means we JSON.SET or JSON.MSET was called. This is a blind update, + // and because the operation sets the size to 0 we also need to include the size of + // the pointer. + if (is_op_set) { + diff += static_cast(mi_usable_size(pv.GetJson())); + } + pv.SetJsonSize(diff); + // Under any flow we must not end up with this special value. + DCHECK(pv.MallocUsed() != 0); + } + + private: + size_t start_size_{0}; +}; + template using ParseResult = io::Result; ParseResult ParseJsonPathAsExpression(std::string_view path) { @@ -287,7 +316,7 @@ bool JsonAreEquals(const JsonType& lhs, const JsonType& rhs) { } } -facade::OpStatus SetJson(const OpArgs& op_args, string_view key, JsonType&& value) { +OpResult SetJson(const OpArgs& op_args, string_view key, JsonType value) { auto& db_slice = op_args.GetDbSlice(); auto op_res = db_slice.AddOrFind(op_args.db_cntx, key); @@ -297,7 +326,7 @@ facade::OpStatus SetJson(const OpArgs& op_args, string_view key, JsonType&& valu op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, res.it->second); - if (absl::GetFlag(FLAGS_experimental_flat_json)) { + if (JsonEnconding() == kEncodingJsonFlat) { flexbuffers::Builder fbb; json::FromJsonType(value, &fbb); fbb.Finish(); @@ -307,7 +336,7 @@ facade::OpStatus SetJson(const OpArgs& op_args, string_view key, JsonType&& valu res.it->second.SetJson(std::move(value)); } op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, res.it->second); - return OpStatus::OK; + return std::move(res); } string JsonTypeToName(const JsonType& val) { @@ -572,6 +601,8 @@ OpResult>> JsonMutateOperation(const OpArgs& op_a auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_JSON); RETURN_ON_BAD_STATUS(it_res); + JsonMemTracker mem_tracker; + PrimeValue& pv = it_res->it->second; JsonType* json_val = pv.GetJson(); @@ -587,6 +618,8 @@ OpResult>> JsonMutateOperation(const OpArgs& op_a options.verify_op(*json_val); } + // we need to manually run this before the PostUpdater run + mem_tracker.SetJsonSize(pv, false); it_res->post_updater.Run(); op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv); @@ -856,15 +889,23 @@ OpResult OpDel(const OpArgs& op_args, string_view key, string_view path, auto it = db_slice.FindMutable(op_args.db_cntx, key).it; // post_updater will run immediately return static_cast(db_slice.Del(op_args.db_cntx, it)); } - OpResult result = GetJson(op_args, key); - if (!result) { + JsonMemTracker tracker; + // FindMutable because we need to run the AutoUpdater at the end which will account + // the deltas calculated from the MemoryTracker + auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_JSON); + if (!it_res) { return 0; } + PrimeValue& pv = it_res->it->second; + JsonType* json_val = pv.GetJson(); + + absl::Cleanup update_size_on_exit([tracker, &pv]() mutable { tracker.SetJsonSize(pv, false); }); + if (json_path.HoldsJsonPath()) { const json::Path& path = json_path.AsJsonPath(); long deletions = json::MutatePath( - path, [](optional, JsonType* val) { return true; }, *result); + path, [](optional, JsonType* val) { return true; }, json_val); return deletions; } @@ -874,7 +915,7 @@ OpResult OpDel(const OpArgs& op_args, string_view key, string_view path, return {}; }; - auto res = json_path.Mutate(result.value(), std::move(cb)); + auto res = json_path.Mutate(json_val, std::move(cb)); RETURN_ON_BAD_STATUS(res); if (deletion_items.empty()) { @@ -892,7 +933,7 @@ OpResult OpDel(const OpArgs& op_args, string_view key, string_view path, } std::error_code ec; - jsoncons::jsonpatch::apply_patch(*result.value(), patch, ec); + jsoncons::jsonpatch::apply_patch(*json_val, patch, ec); if (ec) { VLOG(1) << "Failed to apply patch on json with error: " << ec.message(); return 0; @@ -1262,10 +1303,18 @@ OpResult OpSet(const OpArgs& op_args, string_view key, string_view path, } } - OpStatus st = SetJson(op_args, key, std::move(parsed_json.value())); - if (st != OpStatus::OK) { - return st; + JsonMemTracker mem_tracker; + // We need to deep copy parsed_json.value() and not use move! The reason is that otherwise + // it's really difficult to properly track memory deltas because even if we move below, + // the deallocation of parsed_json won't happen in the scope of SetJson but in the scope + // of this function. Because of this, the memory tracking will be off. Another solution here, + // is to use absl::Cleanup and dispatch another Find() but that's too complicated because then + // you need to take into account the order of destructors. + OpResult st = SetJson(op_args, key, parsed_json.value()); + if (st.status() != OpStatus::OK) { + return st.status(); } + mem_tracker.SetJsonSize(st->it->second, st->is_new); return true; } @@ -1281,6 +1330,9 @@ OpResult OpSet(const OpArgs& op_args, string_view key, string_view path, path_exists = true; if (!is_nx_condition) { operation_result = true; + static_assert( + std::is_same_v::propagate_on_container_copy_assignment, + std::false_type>); *val = new_json; } return {}; @@ -1310,9 +1362,13 @@ OpResult OpSet(const OpArgs& op_args, string_view key, string_view path, return OpStatus::OK; }; + // JsonMutateOperation uses it's own JsonMemTracker. It will work, because updates to already + // existing json keys use copy assign, so we don't really need to account for the memory + // allocated by ShardJsonFromString above since it's not being moved here at all. auto res = JsonMutateOperation(op_args, key, json_path, std::move(cb), MutateOperationOptions{std::move(inserter), {}}); RETURN_ON_BAD_STATUS(res); + return operation_result; } @@ -1364,8 +1420,6 @@ OpStatus OpMSet(const OpArgs& op_args, const ShardArgs& args) { // implemented yet. OpStatus OpMerge(const OpArgs& op_args, string_view key, string_view path, const WrappedJsonPath& json_path, std::string_view json_str) { - // DCHECK(!json_path.HoldsJsonPath()); - std::optional parsed_json = ShardJsonFromString(json_str); if (!parsed_json) { VLOG(1) << "got invalid JSON string '" << json_str << "' cannot be saved"; diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 0863f09260d0..014f1e6a9dc0 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -1043,11 +1043,17 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) { std::memcpy(lp, src_lp, bytes); pv_->InitRobj(OBJ_ZSET, OBJ_ENCODING_LISTPACK, lp); } else if (rdb_type_ == RDB_TYPE_JSON) { - auto json = JsonFromString(blob, CompactObj::memory_resource()); - if (!json) { - ec_ = RdbError(errc::bad_json_string); + size_t start_size = static_cast(CompactObj::memory_resource())->used(); + { + auto json = JsonFromString(blob, CompactObj::memory_resource()); + if (!json) { + ec_ = RdbError(errc::bad_json_string); + } + pv_->SetJson(std::move(*json)); } - pv_->SetJson(std::move(*json)); + size_t end_size = static_cast(CompactObj::memory_resource())->used(); + DCHECK(end_size > start_size); + pv_->SetJsonSize(end_size - start_size); } else { LOG(FATAL) << "Unsupported rdb type " << rdb_type_; } diff --git a/tests/dragonfly/memory_test.py b/tests/dragonfly/memory_test.py index 127c769e4c47..060af26af150 100644 --- a/tests/dragonfly/memory_test.py +++ b/tests/dragonfly/memory_test.py @@ -1,6 +1,7 @@ import pytest from redis import asyncio as aioredis from .utility import * +import logging @pytest.mark.opt_only @@ -30,11 +31,19 @@ async def test_rss_used_mem_gap(df_factory, type, keys, val_size, elements): print(f"Running {cmd}") await client.execute_command(cmd) - await asyncio.sleep(1) # Wait for another RSS heartbeat update in Dragonfly + await asyncio.sleep(2) # Wait for another RSS heartbeat update in Dragonfly info = await client.info("memory") - print(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}') + logging.info(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}') assert info["used_memory"] > min_rss, "Weak testcase: too little used memory" - assert info["used_memory_rss"] - info["used_memory"] < max_unaccounted + delta = info["used_memory_rss"] - info["used_memory"] + # It could be the case that the machine is configured to use swap if this assertion fails + assert delta > 0 + assert delta < max_unaccounted + delta = info["used_memory_rss"] - info["object_used_memory"] + # TODO investigate why it fails on string + if type == "json": + assert delta > 0 + assert delta < max_unaccounted await disconnect_clients(client)