Skip to content

Commit

Permalink
chore: dfly_bench can send traffic to memcached (#3271)
Browse files Browse the repository at this point in the history
Also, add hit rate calculation and controllable seed sequences.

The last ones are important if we want to send read traffic with the same key sequence as before
to estimate the hit rate of the system.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Jul 5, 2024
1 parent d9dd54d commit e213d60
Showing 1 changed file with 149 additions and 22 deletions.
171 changes: 149 additions & 22 deletions src/server/dfly_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//

#include <absl/random/random.h>
#include <absl/strings/match.h>
#include <absl/strings/str_cat.h>
#include <absl/strings/str_format.h>
#include <absl/strings/str_split.h>
Expand All @@ -13,6 +14,7 @@
#include "absl/time/time.h"
#include "base/histogram.h"
#include "base/init.h"
#include "base/random.h"
#include "base/zipf_gen.h"
#include "facade/redis_parser.h"
#include "io/io_buf.h"
Expand All @@ -35,11 +37,13 @@ ABSL_FLAG(uint64_t, key_maximum, 50'000'000, "Max value for keys used");
ABSL_FLAG(string, key_prefix, "key:", "keys prefix");
ABSL_FLAG(string, key_dist, "U", "U for uniform, N for normal, Z for zipfian");
ABSL_FLAG(double, zipf_alpha, 0.99, "zipfian alpha parameter");
ABSL_FLAG(uint64_t, seed, 42, "A seed for random data generation");
ABSL_FLAG(uint64_t, key_stddev, 0,
"Standard deviation for non-uniform distribution, 0 chooses"
" a default value of (max-min)/6");
ABSL_FLAG(string, ratio, "1:10", "Set:Get ratio");
ABSL_FLAG(string, command, "", "custom command with __key__ placeholder for keys");
ABSL_FLAG(string, P, "", "protocol can be empty (for RESP) or memcache_text");

using namespace std;
using namespace util;
Expand All @@ -50,7 +54,9 @@ using tcp = ::boost::asio::ip::tcp;

constexpr string_view kKeyPlaceholder = "__key__"sv;

thread_local absl::InsecureBitGen bit_gen;
thread_local base::Xoroshiro128p bit_gen;

enum Protocol { RESP, MC_TEXT } protocol;

class KeyGenerator {
public:
Expand All @@ -72,13 +78,21 @@ class CommandGenerator {

string operator()();

bool might_hit() const {
return might_hit_;
}

private:
void FillSet(string_view key);
void FillGet(string_view key);

KeyGenerator* keygen_;
uint32_t ratio_set_ = 0, ratio_get_ = 0;
string command_;
string cmd_;
std::vector<size_t> key_indices_;
string value_;
bool might_hit_ = false;
};

CommandGenerator::CommandGenerator(KeyGenerator* keygen) : keygen_(keygen) {
Expand All @@ -104,9 +118,11 @@ string CommandGenerator::operator()() {
key = (*keygen_)();

if (absl::Uniform(bit_gen, 0U, ratio_get_ + ratio_set_) < ratio_set_) {
absl::StrAppend(&cmd_, "set ", key, " ", value_, "\r\n");
FillSet(key);
might_hit_ = false;
} else {
absl::StrAppend(&cmd_, "get ", key, "\r\n");
FillGet(key);
might_hit_ = true;
}
} else {
size_t last_pos = 0;
Expand All @@ -120,6 +136,20 @@ string CommandGenerator::operator()() {
return cmd_;
}

void CommandGenerator::FillSet(string_view key) {
if (protocol == RESP) {
absl::StrAppend(&cmd_, "set ", key, " ", value_, "\r\n");
} else {
DCHECK_EQ(protocol, MC_TEXT);
absl::StrAppend(&cmd_, "set ", key, " 0 0 ", value_.size(), "\r\n");
absl::StrAppend(&cmd_, value_, "\r\n");
}
}

void CommandGenerator::FillGet(string_view key) {
absl::StrAppend(&cmd_, "get ", key, "\r\n");
}

// Per connection driver.
class Driver {
public:
Expand All @@ -135,15 +165,25 @@ class Driver {
void Connect(unsigned index, const tcp::endpoint& ep);
void Run(uint32_t num_reqs, uint64_t cycle_ns, base::Histogram* dest);

uint64_t hit_count() const {
return hit_count_;
}

uint64_t hit_opportunities() const {
return hit_opportunities_;
}

private:
void ReceiveFb(base::Histogram* dest);

struct Req {
uint64_t start;
bool might_hit;
};

unique_ptr<FiberSocketBase> socket_;
queue<Req> reqs_;
uint64_t hit_count_ = 0, hit_opportunities_ = 0;
};

// Per thread client.
Expand All @@ -162,6 +202,8 @@ class TLocalClient {
void Run(uint64_t cycle_ns);

base::Histogram hist;
uint64_t hit_count = 0;
uint64_t hit_opportunities = 0;

private:
ProactorBase* p_;
Expand Down Expand Up @@ -243,8 +285,9 @@ void Driver::Run(uint32_t num_reqs, uint64_t cycle_ns, base::Histogram* dest) {

Req req;
req.start = absl::GetCurrentTimeNanos();
req.might_hit = cmd_gen.might_hit();

reqs_.push(req);
// TODO: add type (get/set)

error_code ec = socket_->Write(io::Buffer(cmd));
if (ec && FiberSocketBase::IsConnClosed(ec)) {
Expand All @@ -269,11 +312,36 @@ void Driver::Run(uint32_t num_reqs, uint64_t cycle_ns, base::Histogram* dest) {
std::ignore = socket_->Close();
}

static string_view FindLine(io::Bytes buf) {
if (buf.size() < 2)
return {};
for (unsigned i = 0; i < buf.size() - 1; ++i) {
if (buf[i] == '\r' && buf[i + 1] == '\n') {
return io::View(buf.subspan(0, i + 2));
}
}
return {};
};

void Driver::ReceiveFb(base::Histogram* dest) {
facade::RedisParser parser{1 << 16, false};
io::IoBuf io_buf{512};
unsigned num_resp = 0;

auto pop_req = [&] {
uint64_t now = absl::GetCurrentTimeNanos();
uint64_t usec = (now - reqs_.front().start) / 1000;
dest->Add(usec);
hit_opportunities_ += reqs_.front().might_hit;

reqs_.pop();
++num_resp;
};

unsigned blob_len = 0;

while (true) {
io_buf.EnsureCapacity(256);
auto buf = io_buf.AppendBuffer();
VLOG(2) << "Socket read: " << reqs_.size() << " " << num_resp;

Expand All @@ -284,22 +352,53 @@ void Driver::ReceiveFb(base::Histogram* dest) {
CHECK(recv_sz) << recv_sz.error().message();
io_buf.CommitWrite(*recv_sz);

uint32_t consumed = 0;
RedisParser::Result result = RedisParser::OK;
RespVec parse_args;

do {
result = parser.Parse(io_buf.InputBuffer(), &consumed, &parse_args);
if (result == RedisParser::OK && !parse_args.empty()) {
uint64_t now = absl::GetCurrentTimeNanos();
uint64_t usec = (now - reqs_.front().start) / 1000;
dest->Add(usec);
reqs_.pop();
parse_args.clear();
++num_resp;
if (protocol == RESP) {
uint32_t consumed = 0;
RedisParser::Result result = RedisParser::OK;
RespVec parse_args;

do {
result = parser.Parse(io_buf.InputBuffer(), &consumed, &parse_args);
if (result == RedisParser::OK && !parse_args.empty()) {
if (reqs_.front().might_hit && parse_args[0].type != facade::RespExpr::NIL) {
++hit_count_;
}
parse_args.clear();
pop_req();
}
io_buf.ConsumeInput(consumed);
} while (result == RedisParser::OK);
} else {
// MC_TEXT
while (true) {
string_view line = FindLine(io_buf.InputBuffer());
if (line.empty())
break;
CHECK_EQ(line.back(), '\n');
if (line == "STORED\r\n" || line == "END\r\n") {
pop_req();
blob_len = 0;
} else if (absl::StartsWith(line, "VALUE")) {
// last token is a blob length.
auto it = line.rbegin();
while (it != line.rend() && *it != ' ')
++it;
size_t len = it - line.rbegin() - 2;
const char* start = &(*it) + 1;
if (!absl::SimpleAtoi(string(start, len), &blob_len)) {
LOG(ERROR) << "Invalid blob len " << line;
return;
}
++hit_count_;
} else {
auto handle = socket_->native_handle();
CHECK_EQ(blob_len + 2, line.size());
blob_len = 0;
VLOG(2) << "Got line " << handle << ": " << line;
}
io_buf.ConsumeInput(line.size());
}
io_buf.ConsumeInput(consumed);
} while (result == RedisParser::OK);
}
}
VLOG(1) << "ReceiveFb done";
}
Expand Down Expand Up @@ -330,6 +429,12 @@ void TLocalClient::Run(uint64_t cycle_ns) {

for (auto& fb : fbs)
fb.Join();

for (size_t i = 0; i < drivers_.size(); ++i) {
hit_count += drivers_[i].hit_count();
hit_opportunities += drivers_[i].hit_opportunities();
}
VLOG(1) << "Total hits: " << hit_count;
}

int main(int argc, char* argv[]) {
Expand All @@ -339,6 +444,14 @@ int main(int argc, char* argv[]) {
pp.reset(fb2::Pool::IOUring(256));
pp->Run();

string proto_str = GetFlag(FLAGS_P);
if (proto_str == "memcache_text") {
protocol = MC_TEXT;
} else {
CHECK(proto_str.empty());
protocol = RESP;
}

auto* proactor = pp->GetNextProactor();
char ip_addr[128];

Expand All @@ -352,7 +465,11 @@ int main(int argc, char* argv[]) {
thread_local unique_ptr<TLocalClient> client;

LOG(INFO) << "Connecting threads";
pp->AwaitFiberOnAll([&](auto* p) {
pp->AwaitFiberOnAll([&](unsigned index, auto* p) {
base::SplitMix64 seed_mix(GetFlag(FLAGS_seed) + index * 0x6a45554a264d72bULL);
auto seed = seed_mix();
VLOG(1) << "Seeding bitgen with seed " << seed;
bit_gen.seed(seed);
client = make_unique<TLocalClient>(p);
client->Connect(ep);
});
Expand All @@ -373,15 +490,25 @@ int main(int argc, char* argv[]) {

fb2::Mutex mutex;
base::Histogram hist;

LOG(INFO) << "Resetting all threads";
uint64_t hit_opportunities = 0, hit_count = 0;

pp->AwaitFiberOnAll([&](auto* p) {
lock_guard gu(mutex);
unique_lock lk(mutex);
hist.Merge(client->hist);

hit_opportunities += client->hit_opportunities;
hit_count += client->hit_count;
lk.unlock();
client.reset();
});

CONSOLE_INFO << "Latency summary, all times are in usec:\n" << hist.ToString();

if (hit_opportunities) {
CONSOLE_INFO << "----------------------------------\nHit rate: "
<< 100 * double(hit_count) / double(hit_opportunities) << "%\n";
}
pp->Stop();

return 0;
Expand Down

0 comments on commit e213d60

Please sign in to comment.