Skip to content

Commit

Permalink
fix: stream memory counting durin snapshot loading
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev committed Dec 20, 2024
1 parent c5ef553 commit b6ed223
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 18 deletions.
3 changes: 3 additions & 0 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ extern "C" {
#include "server/serializer_commons.h"
#include "server/server_state.h"
#include "server/set_family.h"
#include "server/stream_family.h"
#include "server/transaction.h"
#include "strings/human_readable.h"

Expand Down Expand Up @@ -703,6 +704,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {

void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
stream* s;
StreamMemTracker mem_tracker;
if (config_.append) {
if (!EnsureObjEncoding(OBJ_STREAM, OBJ_ENCODING_STREAM)) {
return;
Expand Down Expand Up @@ -848,6 +850,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
if (!config_.append) {
pv_->InitRobj(OBJ_STREAM, OBJ_ENCODING_STREAM, s);
}
mem_tracker.UpdateStreamSize(*pv_);
}

void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
Expand Down
30 changes: 12 additions & 18 deletions src/server/stream_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ namespace dfly {
using namespace facade;
using namespace std;

StreamMemTracker::StreamMemTracker() {
start_size_ = zmalloc_used_memory_tl;
}

void StreamMemTracker::UpdateStreamSize(PrimeValue& pv) const {
const size_t current = zmalloc_used_memory_tl;
int64_t diff = static_cast<int64_t>(current) - static_cast<int64_t>(start_size_);
pv.AddStreamSize(diff);
// Under any flow we must not end up with this special value.
DCHECK(pv.MallocUsed() != 0);
}

namespace {

struct Record {
Expand Down Expand Up @@ -612,24 +624,6 @@ int StreamTrim(const AddTrimOpts& opts, stream* s) {
return 0;
}

class StreamMemTracker {
public:
StreamMemTracker() {
start_size_ = zmalloc_used_memory_tl;
}

void UpdateStreamSize(PrimeValue& pv) const {
const size_t current = zmalloc_used_memory_tl;
int64_t diff = static_cast<int64_t>(current) - static_cast<int64_t>(start_size_);
pv.AddStreamSize(diff);
// Under any flow we must not end up with this special value.
DCHECK(pv.MallocUsed() != 0);
}

private:
size_t start_size_{0};
};

OpResult<streamID> OpAdd(const OpArgs& op_args, const AddTrimOpts& opts, CmdArgList args) {
DCHECK(!args.empty() && args.size() % 2 == 0);
auto& db_slice = op_args.GetDbSlice();
Expand Down
13 changes: 13 additions & 0 deletions src/server/stream_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,19 @@ namespace dfly {
class CommandRegistry;
struct CommandContext;

class CompactObj;
using PrimeValue = CompactObj;

class StreamMemTracker {
public:
StreamMemTracker();

void UpdateStreamSize(PrimeValue& pv) const;

private:
size_t start_size_{0};
};

class StreamFamily {
public:
static void Register(CommandRegistry* registry);
Expand Down

0 comments on commit b6ed223

Please sign in to comment.