Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to target (i) memcached servers that support the binary protocol over UDP, and (ii) resource-contrained memcached servers (supporting only short keys); and to evaluate the latency of each transaction. #12

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
21 changes: 18 additions & 3 deletions client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ int client::connect(void)
evbuffer_drain(m_write_buf, evbuffer_get_length(m_write_buf));

if (m_unix_sockaddr != NULL) {
m_sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
m_sockfd = socket(AF_UNIX, m_config->use_udp ? SOCK_DGRAM : SOCK_STREAM, 0);
if (m_sockfd < 0) {
return -errno;
}
Expand All @@ -300,8 +300,10 @@ int client::connect(void)
error = setsockopt(m_sockfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
assert(error == 0);

error = setsockopt(m_sockfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
assert(error == 0);
if (!m_config->use_udp) {
error = setsockopt(m_sockfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
assert(error == 0);
}
}

// set non-blcoking behavior
Expand Down Expand Up @@ -555,6 +557,7 @@ void client::create_request(void)
assert(key != NULL);
assert(keylen > 0);

keylen = 6; // FIXME const
benchmark_debug_log("GET key=[%.*s]\n", keylen, key);
cmd_size = m_protocol->write_command_get(key, keylen, m_config->data_offset);

Expand Down Expand Up @@ -618,13 +621,25 @@ void client::handle_response(request *request, protocol_response *response)
{
switch (request->m_type) {
case rt_get:
if (m_config->transaction_latency) {
// NOTE using printf adds latency to the client because of the system call, but we're measuring transaction latency, not throughput.
// FIXME might be preferable to print to some user-specified file, rather than to stdout
printf("GET %lu\n", ts_diff_now(request->m_sent_time));
}

m_stats.update_get_op(NULL,
request->m_size + response->get_total_len(),
ts_diff_now(request->m_sent_time),
response->get_hits(),
request->m_keys - response->get_hits());
break;
case rt_set:
if (m_config->transaction_latency) {
// NOTE using printf adds latency to the client because of the system call, but we're measuring transaction latency, not throughput.
// FIXME might be preferable to print to some user-specified file, rather than to stdout
printf("SET %lu\n", ts_diff_now(request->m_sent_time));
}

m_stats.update_set_op(NULL,
request->m_size + response->get_total_len(),
ts_diff_now(request->m_sent_time));
Expand Down
6 changes: 3 additions & 3 deletions config_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ const char* config_weight_list::print(char *buf, int buf_len)
}


server_addr::server_addr(const char *hostname, int port) :
m_hostname(hostname), m_port(port), m_server_addr(NULL), m_used_addr(NULL), m_last_error(0)
server_addr::server_addr(const char *hostname, int port, transport_protocol protocol) :
m_hostname(hostname), m_port(port), m_protocol(protocol), m_server_addr(NULL), m_used_addr(NULL), m_last_error(0)
{
int error = resolve();

Expand All @@ -215,7 +215,7 @@ int server_addr::resolve(void)

memset(&hints, 0, sizeof(hints));
hints.ai_flags = AI_PASSIVE;
hints.ai_socktype = SOCK_STREAM;
hints.ai_socktype = m_protocol;
hints.ai_family = AF_INET; // Don't play with IPv6 for now...

snprintf(port_str, sizeof(port_str)-1, "%u", m_port);
Expand Down
4 changes: 3 additions & 1 deletion config_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ struct connect_info {
};

struct server_addr {
server_addr(const char *hostname, int port);
enum transport_protocol {TCP=SOCK_STREAM, UDP=SOCK_DGRAM};
server_addr(const char *hostname, int port, transport_protocol proto);
virtual ~server_addr();

int get_connect_info(struct connect_info *ci);
Expand All @@ -88,6 +89,7 @@ struct server_addr {

std::string m_hostname;
int m_port;
int m_protocol;
struct addrinfo *m_server_addr;
struct addrinfo *m_used_addr;
int m_last_error;
Expand Down
16 changes: 16 additions & 0 deletions libmemcached_protocol/binary.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,22 @@ extern "C"
PROTOCOL_BINARY_RAW_BYTES = 0x00
} protocol_binary_datatypes;

/**
* Definition of the extra header that is used when using the
* memcached binary protocol over UDP. This header lies between
* the UDP header and the memcached datagram.
* See https://github.com/memcached/memcached/blob/master/doc/protocol.txt#L922
*/
typedef union {
struct {
uint16_t request_id;
uint16_t sequence_no;
uint16_t total_datagrams;
uint16_t reserved; // Must be set to 0
} header;
uint8_t bytes[8];
} protocol_binary_udp_header;

/**
* Definition of the header structure for a request packet.
* See section 2
Expand Down
47 changes: 40 additions & 7 deletions memtier_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ static void config_print(FILE *file, struct benchmark_config *cfg)
"multi_key_get = %u\n"
"authenticate = %s\n"
"select-db = %d\n"
"no-expiry = %s\n",
"no-expiry = %s\n"
"transaction_latency = %s\n"
"key-width = %u\n"
"udp = %s\n",
cfg->server,
cfg->port,
cfg->unix_socket,
Expand Down Expand Up @@ -155,7 +158,10 @@ static void config_print(FILE *file, struct benchmark_config *cfg)
cfg->multi_key_get,
cfg->authenticate ? cfg->authenticate : "",
cfg->select_db,
cfg->no_expiry ? "yes" : "no");
cfg->no_expiry ? "yes" : "no",
cfg->transaction_latency ? "yes" : "no",
cfg->key_width,
cfg->use_udp ? "yes" : "no");
}

static void config_init_defaults(struct benchmark_config *cfg)
Expand Down Expand Up @@ -196,6 +202,8 @@ static void config_init_defaults(struct benchmark_config *cfg)
}
if (!cfg->requests && !cfg->test_time)
cfg->requests = 10000;
if (!cfg->key_width)
cfg->key_width = OBJECT_GENERATOR_KEY_WIDTH;
}

static int generate_random_seed()
Expand Down Expand Up @@ -241,7 +249,10 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
o_generate_keys,
o_multi_key_get,
o_select_db,
o_no_expiry
o_no_expiry,
o_transaction_latency,
o_key_width,
o_use_udp,
};

static struct option long_options[] = {
Expand Down Expand Up @@ -287,6 +298,9 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
{ "no-expiry", 0, 0, o_no_expiry },
{ "help", 0, 0, 'h' },
{ "version", 0, 0, 'v' },
{ "transaction_latency", 0, 0, o_transaction_latency },
{ "key-width", 1, 0, o_key_width },
{ "udp", 0, 0, o_use_udp },
{ NULL, 0, 0, 0 }
};

Expand Down Expand Up @@ -553,6 +567,22 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf
case o_no_expiry:
cfg->no_expiry = true;
break;
case o_transaction_latency:
cfg->transaction_latency = true;
break;
case o_key_width:
endptr = NULL;
cfg->key_width = (unsigned short) strtoul(optarg, &endptr, 10);
if (!cfg->key_width || cfg->key_width > OBJECT_GENERATOR_KEY_WIDTH || !endptr || *endptr != '\0') {
fprintf(stderr, "error: key-width must be a number in the range [1-%u].\n", OBJECT_GENERATOR_KEY_WIDTH);
return -1;
}
break;
case o_use_udp:
//FIXME check that key and value size can fit in a datagram
fprintf(stderr, "Warning: generating traffic over UDP is still experimental. Check that the size of requests can fit in a UDP datagram.");
cfg->use_udp = true;
break;
default:
return -1;
break;
Expand All @@ -570,6 +600,7 @@ void usage() {
" -s, --server=ADDR Server address (default: localhost)\n"
" -p, --port=PORT Server port (default: 6379)\n"
" -S, --unix-socket=SOCKET UNIX Domain socket name (default: none)\n"
" --udp Connect using UDP rather than TCP (default: false)\n"
" -P, --protocol=PROTOCOL Protocol to use (default: redis). Other\n"
" supported protocols are memcache_text,\n"
" memcache_binary.\n"
Expand Down Expand Up @@ -597,6 +628,7 @@ void usage() {
" --select-db=DB DB number to select, when testing a redis server\n"
" --distinct-client-seed Use a different random seed for each client\n"
" --randomize random seed based on timestamp (default is constant value)\n"
" --transaction_latency Measure and report the latency of each transaction\n"
"\n"
"Object Options:\n"
" -d --data-size=SIZE Object data size (default: 32)\n"
Expand All @@ -619,6 +651,7 @@ void usage() {
" --no-expiry Ignore expiry information in imported data\n"
"\n"
"Key Options:\n"
" --key-width=NUMBER Maximum key size (default: \"250\")\n"
" --key-prefix=PREFIX Prefix for keys (default: \"memtier-\")\n"
" --key-minimum=NUMBER Key ID minimum value (default: 0)\n"
" --key-maximum=NUMBER Key ID maximum value (default: 10000000)\n"
Expand Down Expand Up @@ -651,7 +684,7 @@ struct cg_thread {
cg_thread(unsigned int id, benchmark_config* config, object_generator* obj_gen) :
m_thread_id(id), m_config(config), m_obj_gen(obj_gen), m_cg(NULL), m_protocol(NULL), m_finished(false)
{
m_protocol = protocol_factory(m_config->protocol);
m_protocol = protocol_factory(m_config->protocol, m_config->use_udp);
assert(m_protocol != NULL);

m_cg = new client_group(m_config, m_protocol, m_obj_gen);
Expand Down Expand Up @@ -868,7 +901,7 @@ int main(int argc, char *argv[])

if (cfg.server != NULL && cfg.port > 0) {
try {
cfg.server_addr = new server_addr(cfg.server, cfg.port);
cfg.server_addr = new server_addr(cfg.server, cfg.port, cfg.use_udp ? server_addr::UDP : server_addr::TCP);
} catch (std::runtime_error& e) {
benchmark_error_log("%s:%u: error: %s\n",
cfg.server, cfg.port, e.what());
Expand Down Expand Up @@ -903,7 +936,7 @@ int main(int argc, char *argv[])
exit(1);
}

obj_gen = new object_generator();
obj_gen = new object_generator(cfg.key_width);
assert(obj_gen != NULL);
} else {
// check paramters
Expand Down Expand Up @@ -1094,7 +1127,7 @@ int main(int argc, char *argv[])
// If needed, data verification is done now...
if (cfg.data_verify) {
struct event_base *verify_event_base = event_base_new();
abstract_protocol *verify_protocol = protocol_factory(cfg.protocol);
abstract_protocol *verify_protocol = protocol_factory(cfg.protocol, cfg.use_udp);
verify_client *client = new verify_client(verify_event_base, &cfg, verify_protocol, obj_gen);

fprintf(outfile, "\n\nPerforming data verification...\n");
Expand Down
3 changes: 3 additions & 0 deletions memtier_benchmark.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ struct benchmark_config {
int select_db;
bool no_expiry;
bool resolve_on_connect;
bool transaction_latency;
unsigned short key_width;
bool use_udp;
};


Expand Down
45 changes: 42 additions & 3 deletions obj_gen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <unistd.h>
#include <fcntl.h>
#include <math.h>
#include <stdexcept>

#ifdef HAVE_ASSERT_H
#include <assert.h>
Expand Down Expand Up @@ -117,6 +118,7 @@ int gaussian_noise::gaussian_distribution_range(double stddev, double median, in
}

object_generator::object_generator() :
m_key_width(OBJECT_GENERATOR_KEY_WIDTH),
m_data_size_type(data_size_unknown),
m_data_size_pattern(NULL),
m_random_data(false),
Expand All @@ -136,7 +138,31 @@ object_generator::object_generator() :
m_data_size.size_list = NULL;
}

object_generator::object_generator(unsigned int key_width) :
m_key_width(key_width),
m_data_size_type(data_size_unknown),
m_data_size_pattern(NULL),
m_random_data(false),
m_expiry_min(0),
m_expiry_max(0),
m_key_prefix(NULL),
m_key_min(0),
m_key_max(0),
m_key_stddev(0),
m_key_median(0),
m_value_buffer(NULL),
m_random_fd(-1)
{
assert(m_key_width <= OBJECT_GENERATOR_KEY_WIDTH);

for (int i = 0; i < OBJECT_GENERATOR_KEY_ITERATORS; i++)
m_next_key[i] = 0;

m_data_size.size_list = NULL;
}

object_generator::object_generator(const object_generator& copy) :
m_key_width(copy.m_key_width),
m_data_size_type(copy.m_data_size_type),
m_data_size(copy.m_data_size),
m_data_size_pattern(copy.m_data_size_pattern),
Expand All @@ -150,7 +176,6 @@ object_generator::object_generator(const object_generator& copy) :
m_key_median(copy.m_key_median),
m_value_buffer(NULL),
m_random_fd(-1)

{
if (m_data_size_type == data_size_weighted &&
m_data_size.size_list != NULL) {
Expand Down Expand Up @@ -304,15 +329,29 @@ void object_generator::set_expiry_range(unsigned int expiry_min, unsigned int ex
m_expiry_max = expiry_max;
}

void object_generator::check_key_size()
{
unsigned int width_of_key_prefix = m_key_prefix == NULL ? 0 : strlen(m_key_prefix);
unsigned int width_of_key_max = (unsigned)log10((double)m_key_max) + 1;
if (width_of_key_prefix + width_of_key_max > m_key_width) {
char str [200];
sprintf(str, "Key prefix '%s' (length %u) exceeds maximum key width (%u) when combined with the maximum key index (%u, length %u)", m_key_prefix, width_of_key_prefix, m_key_width, m_key_max, width_of_key_max);
throw std::logic_error(str);
}
}

void object_generator::set_key_prefix(const char *key_prefix)
{
m_key_prefix = key_prefix;
check_key_size();
}

void object_generator::set_key_range(unsigned int key_min, unsigned int key_max)
{
assert (key_min <= key_max);
m_key_min = key_min;
m_key_max = key_max;
check_key_size();
}

void object_generator::set_key_distribution(double key_stddev, double key_median)
Expand Down Expand Up @@ -360,9 +399,9 @@ const char* object_generator::get_key(int iter, unsigned int *len)
{
unsigned int l;
m_key_index = get_key_index(iter);

// format key
l = snprintf(m_key_buffer, sizeof(m_key_buffer)-1,
l = snprintf(m_key_buffer, m_key_width - 1,
"%s%u", m_key_prefix, m_key_index);
if (len != NULL) *len = l;

Expand Down
Loading