diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index bd926b45baf3..d227cd8231e0 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -50,6 +50,7 @@ extern "C" { #include "server/serializer_commons.h" #include "server/server_state.h" #include "server/set_family.h" +#include "server/stream_family.h" #include "server/transaction.h" #include "strings/human_readable.h" @@ -703,6 +704,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) { void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) { stream* s; + StreamMemTracker mem_tracker; if (config_.append) { if (!EnsureObjEncoding(OBJ_STREAM, OBJ_ENCODING_STREAM)) { return; @@ -848,6 +850,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) { if (!config_.append) { pv_->InitRobj(OBJ_STREAM, OBJ_ENCODING_STREAM, s); } + mem_tracker.UpdateStreamSize(*pv_); } void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) { diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index faa1966abb5c..293093e0ec65 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -27,6 +27,18 @@ namespace dfly { using namespace facade; using namespace std; +StreamMemTracker::StreamMemTracker() { + start_size_ = zmalloc_used_memory_tl; +} + +void StreamMemTracker::UpdateStreamSize(PrimeValue& pv) const { + const size_t current = zmalloc_used_memory_tl; + int64_t diff = static_cast(current) - static_cast(start_size_); + pv.AddStreamSize(diff); + // Under any flow we must not end up with this special value. + DCHECK(pv.MallocUsed() != 0); +} + namespace { struct Record { @@ -612,24 +624,6 @@ int StreamTrim(const AddTrimOpts& opts, stream* s) { return 0; } -class StreamMemTracker { - public: - StreamMemTracker() { - start_size_ = zmalloc_used_memory_tl; - } - - void UpdateStreamSize(PrimeValue& pv) const { - const size_t current = zmalloc_used_memory_tl; - int64_t diff = static_cast(current) - static_cast(start_size_); - pv.AddStreamSize(diff); - // Under any flow we must not end up with this special value. - DCHECK(pv.MallocUsed() != 0); - } - - private: - size_t start_size_{0}; -}; - OpResult OpAdd(const OpArgs& op_args, const AddTrimOpts& opts, CmdArgList args) { DCHECK(!args.empty() && args.size() % 2 == 0); auto& db_slice = op_args.GetDbSlice(); diff --git a/src/server/stream_family.h b/src/server/stream_family.h index 4f964ff914c0..715fd36571ed 100644 --- a/src/server/stream_family.h +++ b/src/server/stream_family.h @@ -15,6 +15,19 @@ namespace dfly { class CommandRegistry; struct CommandContext; +class CompactObj; +using PrimeValue = CompactObj; + +class StreamMemTracker { + public: + StreamMemTracker(); + + void UpdateStreamSize(PrimeValue& pv) const; + + private: + size_t start_size_{0}; +}; + class StreamFamily { public: static void Register(CommandRegistry* registry);