Skip to content

Commit

Permalink
test: queue_perf: add queue pair test mode
Browse files Browse the repository at this point in the history
Add pair test mode where events are moved between queue pairs connecting
two or more workers. This mode requires an even number of both queues and
workers. The pair test mode can be enabled with new '-m, --mode <arg>'
command line option.

Signed-off-by: Matias Elo <[email protected]>
Reviewed-by: Tuomas Taipale <[email protected]>
  • Loading branch information
MatiasElo committed Nov 1, 2024
1 parent 3889ea3 commit 6aa0ab7
Showing 1 changed file with 89 additions and 24 deletions.
113 changes: 89 additions & 24 deletions test/performance/odp_queue_perf.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,19 @@

#define MAX_QUEUES (32 * 1024)

typedef enum {
TEST_MODE_LOOP = 0,
TEST_MODE_PAIR,
} test_mode_t;

typedef struct test_options_t {
uint32_t num_queue;
uint32_t num_event;
uint32_t num_round;
uint32_t max_burst;
uint32_t num_cpu;
odp_nonblocking_t nonblock;
test_mode_t mode;
odp_bool_t private_queues;
odp_bool_t single;

Expand Down Expand Up @@ -79,6 +85,12 @@ static void print_usage(void)
"\n"
"Usage: odp_queue_perf [options]\n"
"\n"
" -m, --mode <arg> Test mode:\n"
" 0: Loop: events are enqueued back to the same queue they\n"
" were dequeued from (default)\n"
" 1: Pair: queues are paired and events are always moved\n"
" between the queues when doing dequeue/enqueue. Requires\n"
" an even number of both queues and workers.\n"
" -c, --num_cpu Number of worker threads (default 1)\n"
" -q, --num_queue Number of queues (default 1)\n"
" -e, --num_event Number of events per queue (default 1)\n"
Expand All @@ -94,15 +106,15 @@ static void print_usage(void)

static int parse_options(int argc, char *argv[], test_options_t *test_options)
{
int opt;
int long_index;
int opt, long_index, num_cpu;
int ret = 0;

static const struct option longopts[] = {
{"num_cpu", required_argument, NULL, 'c'},
{"num_queue", required_argument, NULL, 'q'},
{"num_event", required_argument, NULL, 'e'},
{"burst_size", required_argument, NULL, 'b'},
{"mode", required_argument, NULL, 'm'},
{"private", no_argument, NULL, 'p'},
{"num_round", required_argument, NULL, 'r'},
{"lockfree", no_argument, NULL, 'l'},
Expand All @@ -112,12 +124,13 @@ static int parse_options(int argc, char *argv[], test_options_t *test_options)
{NULL, 0, NULL, 0}
};

static const char *shortopts = "+c:q:e:b:pr:lwsh";
static const char *shortopts = "+c:q:e:b:m:pr:lwsh";

test_options->num_cpu = 1;
test_options->num_queue = 1;
test_options->num_event = 1;
test_options->max_burst = 1;
test_options->mode = TEST_MODE_LOOP;
test_options->num_round = 1000;
test_options->nonblock = ODP_BLOCKING;
test_options->single = false;
Expand All @@ -142,6 +155,10 @@ static int parse_options(int argc, char *argv[], test_options_t *test_options)
case 'b':
test_options->max_burst = atoi(optarg);
break;
case 'm':
if (atoi(optarg) == TEST_MODE_PAIR)
test_options->mode = TEST_MODE_PAIR;
break;
case 'r':
test_options->num_round = atoi(optarg);
break;
Expand All @@ -166,23 +183,17 @@ static int parse_options(int argc, char *argv[], test_options_t *test_options)
}
}

if (test_options->num_queue > MAX_QUEUES) {
ODPH_ERR("Too many queues %u. Test maximum %u.\n",
if (test_options->num_queue > MAX_QUEUES || test_options->num_queue == 0) {
ODPH_ERR("Invalid number of queues %u. Test maximum %u.\n",
test_options->num_queue, MAX_QUEUES);
return -1;
}

if (test_options->single && !test_options->private_queues && test_options->num_cpu != 1) {
ODPH_ERR("Using single prod/cons queue(s) with multiple workers not supported.\n");
return -1;
}
num_cpu = test_options->num_cpu;
if (num_cpu == 0)
num_cpu = odp_cpumask_default_worker(NULL, 0);

if (test_options->private_queues) {
int num_cpu = test_options->num_cpu;

if (num_cpu == 0)
num_cpu = odp_cpumask_default_worker(NULL, 0);

if ((int)test_options->num_queue < num_cpu) {
ODPH_ERR("Not enough queues for %d workers.\n", num_cpu);
return -1;
Expand All @@ -192,6 +203,19 @@ static int parse_options(int argc, char *argv[], test_options_t *test_options)
"workers.\n", test_options->num_queue, num_cpu);
}

if (test_options->single && !test_options->private_queues) {
if ((test_options->mode == TEST_MODE_LOOP && num_cpu != 1) ||
(test_options->mode == TEST_MODE_PAIR && num_cpu != 2)) {
ODPH_ERR("Multiple producers/consumers not allowed with single prod/cons queues.\n");
return -1;
}
}

if (test_options->mode == TEST_MODE_PAIR && (test_options->num_queue % 2 || num_cpu % 2)) {
ODPH_ERR("Pair mode requires an even number of queues and workers.\n");
return -1;
}

return ret;
}

Expand All @@ -217,6 +241,8 @@ static int create_queues(test_global_t *global)
nonblock == ODP_BLOCKING ? "NORMAL" :
(nonblock == ODP_NONBLOCKING_LF ? "LOCKFREE" :
(nonblock == ODP_NONBLOCKING_WF ? "WAITFREE" : "???")));
printf(" mode %s\n", test_options->mode == TEST_MODE_LOOP ?
"loop" : "pair");
printf(" private queues %s\n", test_options->private_queues ? "yes" : "no");
printf(" single prod/cons %s\n", test_options->single ? "yes" : "no");
printf(" num rounds %u\n", num_round);
Expand Down Expand Up @@ -490,26 +516,65 @@ static void map_queues_to_threads(test_global_t *global)
{
test_options_t *opt = &global->options;

if (!opt->private_queues) {
if (opt->mode == TEST_MODE_LOOP) {
if (!opt->private_queues) {
for (uint32_t i = 0; i < opt->num_queue; i++) {
for (uint32_t j = 0; j < opt->num_cpu; j++) {
thread_args_t *thread_args = &global->thread_args[j];

thread_args->src_queue_id[i] = i;
thread_args->dst_queue_id[i] = i;
thread_args->num_queues++;
}
}
return;
}

for (uint32_t i = 0; i < opt->num_queue; i++) {
thread_args_t *thread_args = &global->thread_args[i % opt->num_cpu];
uint32_t queue_idx = thread_args->num_queues;

thread_args->src_queue_id[queue_idx] = i;
thread_args->dst_queue_id[queue_idx] = i;
thread_args->num_queues++;
}
return;
}
/* Pair mode. Always an even number of both queues and CPUs. */
if (!opt->private_queues) {
for (uint32_t i = 0; i < opt->num_queue; i += 2) {
for (uint32_t j = 0; j < opt->num_cpu; j++) {
thread_args_t *thread_args = &global->thread_args[j];

thread_args->src_queue_id[i] = i;
thread_args->dst_queue_id[i] = i;
uint32_t num_queues = thread_args->num_queues;

if (j % 2 == 0) {
thread_args->src_queue_id[num_queues] = i;
thread_args->dst_queue_id[num_queues] = i + 1;
} else {
thread_args->src_queue_id[num_queues] = i + 1;
thread_args->dst_queue_id[num_queues] = i;
}
thread_args->num_queues++;
}
}
return;
}

for (uint32_t i = 0; i < opt->num_queue; i++) {
thread_args_t *thread_args = &global->thread_args[i % opt->num_cpu];
uint32_t queue_idx = thread_args->num_queues;
for (uint32_t i = 0; i < opt->num_queue; i += 2) {
uint32_t num_queues;
uint32_t thread_a_idx = i % opt->num_cpu;
thread_args_t *thread_a_args = &global->thread_args[thread_a_idx];
thread_args_t *thread_b_args = &global->thread_args[thread_a_idx + 1];

num_queues = thread_a_args->num_queues;
thread_a_args->src_queue_id[num_queues] = i;
thread_a_args->dst_queue_id[num_queues] = i + 1;
thread_a_args->num_queues++;

thread_args->src_queue_id[queue_idx] = i;
thread_args->dst_queue_id[queue_idx] = i;
thread_args->num_queues++;
num_queues = thread_b_args->num_queues;
thread_b_args->src_queue_id[num_queues] = i + 1;
thread_b_args->dst_queue_id[num_queues] = i;
thread_b_args->num_queues++;
}
}

Expand Down

0 comments on commit 6aa0ab7

Please sign in to comment.