Skip to content

Commit 9f71dca

Browse files
Stanislav Fomichevkuba-moo
Stanislav Fomichev
authored andcommitted
introduce rx and tx session modes
And convert existing --msg-trunc and --msg-zerocopy to this new message. Potentially io_uring can be a separate mode as well. Signed-off-by: Stanislav Fomichev <[email protected]>
1 parent 4bc9294 commit 9f71dca

File tree

6 files changed

+124
-13
lines changed

6 files changed

+124
-13
lines changed

client.c

+18-2
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,8 @@ dump_result_machine(struct kpm_test_results *result, const char *dir,
532532

533533
int main(int argc, char *argv[])
534534
{
535+
enum kpm_rx_mode rx_mode = KPM_RX_MODE_SOCKET;
536+
enum kpm_tx_mode tx_mode = KPM_TX_MODE_SOCKET;
535537
unsigned int src_ncpus, dst_ncpus;
536538
struct __kpm_generic_u32 *ack_id;
537539
__u32 *src_wrk_cpu, *dst_wrk_cpu;
@@ -607,6 +609,22 @@ int main(int argc, char *argv[])
607609
goto out;
608610
}
609611

612+
if (opt.msg_trunc)
613+
rx_mode = KPM_RX_MODE_SOCKET_TRUNC;
614+
615+
if (opt.msg_zerocopy)
616+
tx_mode = KPM_TX_MODE_SOCKET_ZEROCOPY;
617+
618+
if (kpm_req_mode(dst, rx_mode, tx_mode) < 0) {
619+
warnx("Failed setup destination mode");
620+
goto out;
621+
}
622+
623+
if (kpm_req_mode(src, rx_mode, tx_mode) < 0) {
624+
warnx("Failed setup source mode");
625+
goto out;
626+
}
627+
610628
conns = spawn_conn(src, dst, &conn_addr, len);
611629
if (!conns)
612630
goto out;
@@ -677,8 +695,6 @@ int main(int argc, char *argv[])
677695
test->specs[i].worker_id = dst_wrk_id[i];
678696
test->specs[i].read_size = opt.read_size;
679697
test->specs[i].write_size = opt.write_size;
680-
test->specs[i].msg_trunc = opt.msg_trunc;
681-
test->specs[i].msg_zerocopy = opt.msg_zerocopy;
682698
if (opt.req_size == ~0U) {
683699
test->specs[i].type = KPM_TEST_TYPE_STREAM;
684700
} else {

proto.c

+38
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,16 @@ int kpm_send_tcp_cc(int fd, __u32 id, char *cc_name)
180180
return kpm_send(fd, &msg.hdr, sizeof(msg), KPM_MSG_TYPE_TCP_CC);
181181
}
182182

183+
int kpm_send_mode(int fd, enum kpm_rx_mode rx_mode, enum kpm_tx_mode tx_mode)
184+
{
185+
struct kpm_mode msg = {};
186+
187+
msg.rx_mode = rx_mode;
188+
msg.tx_mode = tx_mode;
189+
190+
return kpm_send(fd, &msg.hdr, sizeof(msg), KPM_MSG_TYPE_MODE);
191+
}
192+
183193
int kpm_send_pin_worker(int fd, __u32 id, __u32 cpu)
184194
{
185195
struct kpm_pin_worker msg;
@@ -443,6 +453,34 @@ kpm_req_tcp_cc(int fd, __u32 conn_id, char *cc_name)
443453
return 0;
444454
}
445455

456+
int
457+
kpm_req_mode(int fd, enum kpm_rx_mode rx_mode, enum kpm_tx_mode tx_mode)
458+
{
459+
struct kpm_empty *repl;
460+
int id;
461+
462+
id = kpm_send_mode(fd, rx_mode, tx_mode);
463+
if (id < 0) {
464+
warnx("Failed to request mode");
465+
return id;
466+
}
467+
468+
repl = kpm_receive(fd);
469+
if (!repl) {
470+
warnx("Failed to request mode - no response");
471+
return -1;
472+
}
473+
474+
if (!kpm_good_reply(repl, KPM_MSG_TYPE_MODE, id)) {
475+
warnx("Failed to request mode - bad reply");
476+
free(repl);
477+
return -1;
478+
}
479+
480+
free(repl);
481+
return 0;
482+
}
483+
446484
int kpm_req_disconnect(int fd, __u32 connection_id)
447485
{
448486
struct kpm_empty *repl;

proto.h

+19-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ enum kpm_msg_type {
2525
KPM_MSG_TYPE_TLS,
2626
KPM_MSG_TYPE_MAX_PACING,
2727
KPM_MSG_TYPE_TCP_CC,
28+
KPM_MSG_TYPE_MODE,
2829
KPM_MSG_TYPE_TEST,
2930
KPM_MSG_TYPE_TEST_RESULT,
3031
KPM_MSG_TYPE_END_TEST,
@@ -121,6 +122,22 @@ struct kpm_tcp_cc {
121122
char cc_name[KPM_CC_NAME_LEN];
122123
};
123124

125+
enum kpm_rx_mode {
126+
KPM_RX_MODE_SOCKET,
127+
KPM_RX_MODE_SOCKET_TRUNC,
128+
};
129+
130+
enum kpm_tx_mode {
131+
KPM_TX_MODE_SOCKET,
132+
KPM_TX_MODE_SOCKET_ZEROCOPY,
133+
};
134+
135+
struct kpm_mode {
136+
struct kpm_header hdr;
137+
enum kpm_rx_mode rx_mode;
138+
enum kpm_tx_mode tx_mode;
139+
};
140+
124141
enum kpm_tls_mask {
125142
KPM_TLS_ULP = 1,
126143
KPM_TLS_TX = 2,
@@ -164,8 +181,6 @@ struct kpm_test {
164181
enum kpm_test_type type;
165182
__u32 read_size;
166183
__u32 write_size;
167-
__u32 msg_trunc:1;
168-
__u32 msg_zerocopy:1;
169184
union kpm_test_arg {
170185
struct {
171186
__u32 req_size;
@@ -254,6 +269,7 @@ int kpm_send_tls(int fd, __u32 conn_id, __u32 dir_mask,
254269
void *info, socklen_t len);
255270
int kpm_send_max_pacing(int fd, __u32 id, __u32 max_pace);
256271
int kpm_send_tcp_cc(int fd, __u32 id, char *cc_name);
272+
int kpm_send_mode(int fd, enum kpm_rx_mode rx_mode, enum kpm_tx_mode tx_mode);
257273
int kpm_send_pin_worker(int fd, __u32 id, __u32 cpu);
258274

259275
void kpm_reply_error(int fd, struct kpm_header *hdr, __u16 error);
@@ -276,6 +292,7 @@ int kpm_req_tls(int fd, __u32 conn_id, __u32 dir_mask,
276292
void *info, socklen_t len);
277293
int kpm_req_pacing(int fd, __u32 conn_id, __u32 max_pace);
278294
int kpm_req_tcp_cc(int fd, __u32 conn_id, char *cc_name);
295+
int kpm_req_mode(int fd, enum kpm_rx_mode rx_mode, enum kpm_tx_mode tx_mode);
279296
int kpm_req_disconnect(int fd, __u32 connection_id);
280297

281298
#endif /* PROTO_H */

server.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include <ccan/compiler/compiler.h>
1111
#include <ccan/list/list.h>
1212

13+
#include "proto.h"
14+
1315
struct server_session {
1416
int cfd;
1517
pid_t pid;
@@ -19,6 +21,6 @@ struct server_session {
1921
struct server_session *
2022
server_session_spawn(int fd, struct sockaddr_in6 *addr, socklen_t *addrlen);
2123

22-
void NORETURN pworker_main(int fd);
24+
void NORETURN pworker_main(int fd, enum kpm_rx_mode rx_mode, enum kpm_tx_mode tx_mode);
2325

2426
#endif /* SERVER_H */

server_session.c

+31-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ struct session_state {
3333
int epollfd;
3434
int quit;
3535
int tcp_sock;
36+
enum kpm_rx_mode rx_mode;
37+
enum kpm_tx_mode tx_mode;
3638
unsigned int connection_ids;
3739
unsigned int worker_ids;
3840
unsigned int test_ids;
@@ -495,6 +497,31 @@ server_msg_tcp_cc(struct session_state *self, struct kpm_header *hdr)
495497
self->quit = 1;
496498
}
497499

500+
static void
501+
server_msg_mode(struct session_state *self, struct kpm_header *hdr)
502+
{
503+
struct kpm_mode *req;
504+
505+
if (hdr->len < sizeof(*req)) {
506+
warn("Invalid request in %s", __func__);
507+
goto err_quit;
508+
}
509+
req = (void *)hdr;
510+
511+
self->rx_mode = req->rx_mode;
512+
self->tx_mode = req->tx_mode;
513+
514+
if (kpm_reply_empty(self->main_sock, hdr) < 1) {
515+
warnx("Reply failed");
516+
goto err_quit;
517+
}
518+
519+
return;
520+
521+
err_quit:
522+
self->quit = 1;
523+
}
524+
498525
static void
499526
server_msg_spawn_pworker(struct session_state *self, struct kpm_header *hdr)
500527
{
@@ -519,7 +546,7 @@ server_msg_spawn_pworker(struct session_state *self, struct kpm_header *hdr)
519546
}
520547
if (!pwrk->pid) {
521548
close(p[0]);
522-
pworker_main(p[1]);
549+
pworker_main(p[1], self->rx_mode, self->tx_mode);
523550
exit(1);
524551
}
525552

@@ -788,6 +815,9 @@ static void session_handle_main_sock(struct session_state *self)
788815
case KPM_MSG_TYPE_TCP_CC:
789816
server_msg_tcp_cc(self, hdr);
790817
break;
818+
case KPM_MSG_TYPE_MODE:
819+
server_msg_mode(self, hdr);
820+
break;
791821
case KPM_MSG_TYPE_SPAWN_PWORKER:
792822
server_msg_spawn_pworker(self, hdr);
793823
break;

worker.c

+15-7
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
/* Main worker state AKA self */
2929
struct worker_state {
3030
int main_sock;
31+
enum kpm_rx_mode rx_mode;
32+
enum kpm_tx_mode tx_mode;
3133
int epollfd;
3234
unsigned int id;
3335
int quit;
@@ -319,7 +321,7 @@ worker_msg_test(struct worker_state *self, struct kpm_header *hdr)
319321
else
320322
conn->to_recv = len;
321323

322-
zc = !!conn->spec->msg_zerocopy;
324+
zc = self->tx_mode == KPM_TX_MODE_SOCKET_ZEROCOPY;
323325
if (setsockopt(conn->fd, SOL_SOCKET, SO_ZEROCOPY, &zc, sizeof(zc))) {
324326
warnx("Failed to set SO_ZEROCOPY");
325327
self->quit = 1;
@@ -570,7 +572,8 @@ worker_handle_send(struct worker_state *self, struct connection *conn,
570572
unsigned int events)
571573
{
572574
unsigned int rep = max_t(int, 10, conn->to_send / conn->write_size + 1);
573-
int flags = conn->spec->msg_zerocopy ? MSG_ZEROCOPY : 0;
575+
bool msg_zerocopy = self->tx_mode == KPM_TX_MODE_SOCKET_ZEROCOPY;
576+
int flags = msg_zerocopy ? MSG_ZEROCOPY : 0;
574577

575578
while (rep--) {
576579
void *src = &patbuf[conn->tot_sent % PATTERN_PERIOD];
@@ -598,7 +601,7 @@ worker_handle_send(struct worker_state *self, struct connection *conn,
598601

599602
conn->to_send -= n;
600603
conn->tot_sent += n;
601-
if (conn->spec->msg_zerocopy) {
604+
if (msg_zerocopy) {
602605
conn->to_send_comp += 1;
603606
kpm_dbg("queued send completion, total %d",
604607
conn->to_send_comp);
@@ -619,13 +622,14 @@ worker_handle_send(struct worker_state *self, struct connection *conn,
619622

620623
static ssize_t worker_handle_regular_recv(struct worker_state *self, struct connection *conn, size_t chunk, int rep)
621624
{
625+
bool msg_trunc = self->rx_mode == KPM_RX_MODE_SOCKET_TRUNC;
622626
void *src = &patbuf[conn->tot_recv % PATTERN_PERIOD];
623-
int flags = conn->spec->msg_trunc ? MSG_TRUNC : 0;
627+
int flags = msg_trunc ? MSG_TRUNC : 0;
624628
ssize_t n;
625629

626630
n = recv(conn->fd, conn->rxbuf, chunk, MSG_DONTWAIT | flags);
627631

628-
if (n <= 0 || conn->spec->msg_trunc)
632+
if (n <= 0 || msg_trunc)
629633
return n;
630634

631635
if (memcmp(conn->rxbuf, src, n))
@@ -709,9 +713,13 @@ worker_handle_conn(struct worker_state *self, int fd, unsigned int events)
709713

710714
/* == Main loop == */
711715

712-
void NORETURN pworker_main(int fd)
716+
void NORETURN pworker_main(int fd, enum kpm_rx_mode rx_mode, enum kpm_tx_mode tx_mode)
713717
{
714-
struct worker_state self = { .main_sock = fd, };
718+
struct worker_state self = {
719+
.main_sock = fd,
720+
.rx_mode = rx_mode,
721+
.tx_mode = tx_mode,
722+
};
715723
struct epoll_event ev, events[32];
716724
unsigned char j;
717725
int i, nfds;

0 commit comments

Comments
 (0)