Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Seq write #218

Open
wants to merge 2 commits into
base: next
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ntirpc/rpc/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ typedef int32_t rpc_inline_t;
#define TIRPC_SET_DEBUG_FLAGS 3
#define TIRPC_GET_OTHER_FLAGS 4
#define TIRPC_SET_OTHER_FLAGS 5
#define TIRPC_SET_CONT_RECV 6

/*
* Debug flags support
Expand Down Expand Up @@ -182,6 +183,7 @@ typedef struct tirpc_pkg_params {
mem_2_size_t aligned_;
mem_2_size_t calloc_;
mem_p_size_t realloc_;
uint32_t cont_recv_limit;
} tirpc_pkg_params;

extern tirpc_pkg_params __ntirpc_pkg_params;
Expand Down
4 changes: 4 additions & 0 deletions src/rpc_generic.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ tirpc_pkg_params __ntirpc_pkg_params = {
tirpc_aligned,
tirpc_calloc,
tirpc_realloc,
64,
};

bool
Expand All @@ -161,6 +162,9 @@ tirpc_control(const u_int rq, void *in)
case TIRPC_SET_OTHER_FLAGS:
__ntirpc_pkg_params.other_flags = *(int *)in;
break;
case TIRPC_SET_CONT_RECV:
__ntirpc_pkg_params.cont_recv_limit = *(uint32_t *)in;
break;
default:
return (false);
}
Expand Down
6 changes: 6 additions & 0 deletions src/svc_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
extern int __svc_maxiov;
extern int __svc_maxrec;

struct svc_request_params {
SVCXPRT *xprt;
XDR *xdrs;
};

/* threading fdsets around is annoying */
struct svc_params {
mutex_t mtx;
Expand Down Expand Up @@ -77,6 +82,7 @@ struct svc_params {
int32_t idle_timeout;
};

void svc_request_async(struct work_pool_entry *wpe);
enum xprt_stat svc_request(SVCXPRT *xprt, XDR *xdrs);

extern struct svc_params __svc_params[1];
Expand Down
9 changes: 9 additions & 0 deletions src/svc_rqst.c
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,15 @@ enum xprt_stat svc_request(SVCXPRT *xprt, XDR *xdrs)
return stat;
}

void svc_request_async(struct work_pool_entry *wpe)
{
struct svc_request_params *wpe_arg = wpe->arg;

svc_request(wpe_arg->xprt, wpe_arg->xdrs);
mem_free(wpe->arg, sizeof(struct svc_request_params));
mem_free(wpe, sizeof(struct work_pool_entry));
}

static void svc_resume_task(struct work_pool_entry *wpe)
{
struct svc_req *req =
Expand Down
203 changes: 113 additions & 90 deletions src/svc_vc.c
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,73 @@ svc_vc_stat(SVCXPRT *xprt)
return (XPRT_IDLE);
}

int svc_xprt_recv(SVCXPRT *xprt, char *dst, int len, int recv_flags)
{
int rlen;
int err;
int loglevel = (recv_flags == MSG_WAITALL) ?
TIRPC_DEBUG_FLAG_WARN : TIRPC_DEBUG_FLAG_SVC_VC;

rlen = recv(xprt->xp_fd, dst, len, recv_flags);
if (unlikely(rlen < 0)) {
err = errno;

if (err == EAGAIN || err == EWOULDBLOCK) {
__warnx(loglevel,
"%s: %p fd %d recv errno %d (try again)",
"svc_vc_wait", xprt, xprt->xp_fd, err);
if (unlikely(svc_rqst_rearm_events(xprt,
SVC_XPRT_FLAG_ADDED_RECV))) {
__warnx(TIRPC_DEBUG_FLAG_ERROR,
"%s: %p fd %d svc_rqst_rearm_events failed (will set dead)",
"svc_vc_wait",
xprt, xprt->xp_fd);
SVC_DESTROY(xprt);
err = EINVAL;
}
#ifdef USE_LTTNG_NTIRPC
tracepoint(xprt, recv_exit, __func__, __LINE__,
xprt, "EAGAIN", err);
#endif /* USE_LTTNG_NTIRPC */
return rlen;
}
__warnx(TIRPC_DEBUG_FLAG_WARN,
"%s: %p fd %d recv errno %d (will set dead)",
"svc_vc_wait", xprt, xprt->xp_fd, err);
SVC_DESTROY(xprt);
#ifdef USE_LTTNG_NTIRPC
tracepoint(xprt, recv_exit, __func__, __LINE__,
xprt, "ERROR", err);
#endif /* USE_LTTNG_NTIRPC */
return rlen;
}

if (unlikely(!rlen)) {
__warnx(TIRPC_DEBUG_FLAG_SVC_VC,
"%s: %p fd %d recv closed (will set dead)",
"svc_vc_wait", xprt, xprt->xp_fd);
SVC_DESTROY(xprt);
#ifdef USE_LTTNG_NTIRPC
tracepoint(xprt, recv_exit, __func__, __LINE__,
xprt, "EMPTY", 0);
#endif /* USE_LTTNG_NTIRPC */
}
return rlen;
}

#define CONT_RECV_RPC_SIZE 4096

static inline bool cont_recv(SVCXPRT *xprt, int rcv_len, int recv_counter)
{
bool out = false;

if (rcv_len > CONT_RECV_RPC_SIZE &&
(recv_counter < __ntirpc_pkg_params.cont_recv_limit) &&
!(xprt->xp_flags & (SVC_XPRT_FLAG_DESTROYED)))
out = true;
return out;
}

static enum xprt_stat
svc_vc_recv(SVCXPRT *xprt)
{
Expand All @@ -669,14 +736,18 @@ svc_vc_recv(SVCXPRT *xprt)
struct poolq_entry *have;
struct xdr_ioq_uv *uv;
struct xdr_ioq *xioq;
struct work_pool_entry *wpe;
struct svc_request_params *wpe_arg;
int recv_counter = 0, trlen;
ssize_t rlen;
u_int flags;
int code;

#ifdef USE_LTTNG_NTIRPC
tracepoint(xprt, funcin, __func__, __LINE__, xprt);
#endif /* USE_LTTNG_NTIRPC */

restart:
trlen = 0;
/* no need for locking, only one svc_rqst_xprt_task() per event.
* depends upon svc_rqst_rearm_events() for ordering.
*/
Expand All @@ -691,55 +762,30 @@ svc_vc_recv(SVCXPRT *xprt)
}

if (!xd->sx_fbtbc) {
rlen = recv(xprt->xp_fd, &xd->sx_fbtbc, BYTES_PER_XDR_UNIT,
MSG_WAITALL);

if (unlikely(rlen < 0)) {
code = errno;

if (code == EAGAIN || code == EWOULDBLOCK) {
__warnx(TIRPC_DEBUG_FLAG_WARN,
"%s: %p fd %d recv errno %d (try again)",
"svc_vc_wait", xprt, xprt->xp_fd, code);
if (unlikely(svc_rqst_rearm_events(
xprt,
SVC_XPRT_FLAG_ADDED_RECV))) {
__warnx(TIRPC_DEBUG_FLAG_ERROR,
"%s: %p fd %d svc_rqst_rearm_events failed (will set dead)",
"svc_vc_wait",
xprt, xprt->xp_fd);
SVC_DESTROY(xprt);
code = EINVAL;
}
#ifdef USE_LTTNG_NTIRPC
tracepoint(xprt, recv_exit, __func__, __LINE__,
xprt, "EAGAIN", code);
#endif /* USE_LTTNG_NTIRPC */
return SVC_STAT(xprt);
if (recv_counter) {
/* If this is not the first recv call, we cannot use
* MSG_WAITALL as the current thread will block if
* the data is not available on the socket. The below
* while loop handles the pathalogical case when less
* than BYTES_PER_XDR_UNIT bytes of data is read in
* one recv call.
*/
while (trlen < BYTES_PER_XDR_UNIT) {
rlen = svc_xprt_recv(xprt,
(char *)&xd->sx_fbtbc + trlen,
BYTES_PER_XDR_UNIT - trlen,
MSG_DONTWAIT);
if (rlen <= 0)
break;
trlen += rlen;
}
__warnx(TIRPC_DEBUG_FLAG_WARN,
"%s: %p fd %d recv errno %d (will set dead)",
"svc_vc_wait", xprt, xprt->xp_fd, code);
SVC_DESTROY(xprt);
#ifdef USE_LTTNG_NTIRPC
tracepoint(xprt, recv_exit, __func__, __LINE__,
xprt, "ERROR", code);
#endif /* USE_LTTNG_NTIRPC */
return SVC_STAT(xprt);
} else {
rlen = svc_xprt_recv(xprt, (char *)&xd->sx_fbtbc,
BYTES_PER_XDR_UNIT, MSG_WAITALL);
}

if (unlikely(!rlen)) {
__warnx(TIRPC_DEBUG_FLAG_SVC_VC,
"%s: %p fd %d recv closed (will set dead)",
"svc_vc_wait", xprt, xprt->xp_fd);
SVC_DESTROY(xprt);
#ifdef USE_LTTNG_NTIRPC
tracepoint(xprt, recv_exit, __func__, __LINE__,
xprt, "EMPTY", 0);
#endif /* USE_LTTNG_NTIRPC */
if (rlen <= 0) {
return SVC_STAT(xprt);
}

xd->sx_fbtbc = (int32_t)ntohl((long)xd->sx_fbtbc);
flags = UIO_FLAG_FREE | UIO_FLAG_MORE;

Expand Down Expand Up @@ -773,53 +819,12 @@ svc_vc_recv(SVCXPRT *xprt)
flags = uv->u.uio_flags;
}

rlen = recv(xprt->xp_fd, uv->v.vio_tail, xd->sx_fbtbc, MSG_DONTWAIT);
rlen = svc_xprt_recv(xprt, (char *)uv->v.vio_tail,
xd->sx_fbtbc, MSG_DONTWAIT);

if (unlikely(rlen < 0)) {
code = errno;

if (code == EAGAIN || code == EWOULDBLOCK) {
__warnx(TIRPC_DEBUG_FLAG_SVC_VC,
"%s: %p fd %d recv errno %d (try again)",
__func__, xprt, xprt->xp_fd, code);
if (unlikely(svc_rqst_rearm_events(
xprt,
SVC_XPRT_FLAG_ADDED_RECV))) {
__warnx(TIRPC_DEBUG_FLAG_ERROR,
"%s: %p fd %d svc_rqst_rearm_events failed (will set dead)",
__func__, xprt, xprt->xp_fd);
SVC_DESTROY(xprt);
code = EINVAL;
}
#ifdef USE_LTTNG_NTIRPC
tracepoint(xprt, recv_exit, __func__, __LINE__,
xprt, "EAGAIN", code);
#endif /* USE_LTTNG_NTIRPC */
return SVC_STAT(xprt);
}
__warnx(TIRPC_DEBUG_FLAG_ERROR,
"%s: %p fd %d recv errno %d (will set dead)",
__func__, xprt, xprt->xp_fd, code);
SVC_DESTROY(xprt);
#ifdef USE_LTTNG_NTIRPC
tracepoint(xprt, recv_exit, __func__, __LINE__,
xprt, "ERROR", code);
#endif /* USE_LTTNG_NTIRPC */
if (rlen <= 0) {
return SVC_STAT(xprt);
}

if (unlikely(!rlen)) {
__warnx(TIRPC_DEBUG_FLAG_SVC_VC,
"%s: %p fd %d recv closed (will set dead)",
__func__, xprt, xprt->xp_fd);
SVC_DESTROY(xprt);
#ifdef USE_LTTNG_NTIRPC
tracepoint(xprt, recv_exit, __func__, __LINE__,
xprt, "EMPTY", 0);
#endif /* USE_LTTNG_NTIRPC */
return SVC_STAT(xprt);
}

#ifdef USE_LTTNG_NTIRPC
tracepoint(xprt, recv_bytes, __func__, __LINE__,
xprt, xd->sx_fbtbc, rlen);
Expand All @@ -833,6 +838,9 @@ svc_vc_recv(SVCXPRT *xprt)
__func__, xprt, xprt->xp_fd, rlen, xd->sx_fbtbc, flags);

if (xd->sx_fbtbc || (flags & UIO_FLAG_MORE)) {
if (cont_recv(xprt, rlen + xd->sx_fbtbc, recv_counter++)) {
goto restart;
}
if (unlikely(svc_rqst_rearm_events(xprt,
SVC_XPRT_FLAG_ADDED_RECV))) {
__warnx(TIRPC_DEBUG_FLAG_ERROR,
Expand All @@ -857,6 +865,21 @@ svc_vc_recv(SVCXPRT *xprt)
TAILQ_REMOVE(&rec->ioq.ioq_uv.uvqh.qh, &xioq->ioq_s, q);
xdr_ioq_reset(xioq, 0);

if (cont_recv(xprt, rlen, recv_counter++)) {
/* Submit the request to a new thread while the current
* thread will continue reading from the socket without
* adding to epoll fd.
*/
SVC_REF(xprt, SVC_REF_FLAG_NONE);
wpe = mem_alloc(sizeof(struct work_pool_entry));
wpe_arg = mem_alloc(sizeof(struct svc_request_params));
wpe_arg->xprt = xprt;
wpe_arg->xdrs = xioq->xdrs;
wpe->fun = svc_request_async;
wpe->arg = wpe_arg;
work_pool_submit(&svc_work_pool, wpe);
goto restart;
}
if (unlikely(svc_rqst_rearm_events(xprt, SVC_XPRT_FLAG_ADDED_RECV))) {
__warnx(TIRPC_DEBUG_FLAG_ERROR,
"%s: %p fd %d svc_rqst_rearm_events failed (will set dead)",
Expand Down