diff --git a/ntirpc/rpc/types.h b/ntirpc/rpc/types.h index 60db0f675..b2361a330 100644 --- a/ntirpc/rpc/types.h +++ b/ntirpc/rpc/types.h @@ -115,6 +115,7 @@ typedef int32_t rpc_inline_t; #define TIRPC_SET_DEBUG_FLAGS 3 #define TIRPC_GET_OTHER_FLAGS 4 #define TIRPC_SET_OTHER_FLAGS 5 +#define TIRPC_SET_CONT_RECV 6 /* * Debug flags support @@ -182,6 +183,7 @@ typedef struct tirpc_pkg_params { mem_2_size_t aligned_; mem_2_size_t calloc_; mem_p_size_t realloc_; + uint32_t cont_recv_limit; } tirpc_pkg_params; extern tirpc_pkg_params __ntirpc_pkg_params; diff --git a/src/rpc_generic.c b/src/rpc_generic.c index 0b237f330..5c2597b39 100644 --- a/src/rpc_generic.c +++ b/src/rpc_generic.c @@ -137,6 +137,7 @@ tirpc_pkg_params __ntirpc_pkg_params = { tirpc_aligned, tirpc_calloc, tirpc_realloc, + 64, }; bool @@ -161,6 +162,9 @@ tirpc_control(const u_int rq, void *in) case TIRPC_SET_OTHER_FLAGS: __ntirpc_pkg_params.other_flags = *(int *)in; break; + case TIRPC_SET_CONT_RECV: + __ntirpc_pkg_params.cont_recv_limit = *(uint32_t *)in; + break; default: return (false); } diff --git a/src/svc_internal.h b/src/svc_internal.h index 682e291bf..dcaa7a21c 100644 --- a/src/svc_internal.h +++ b/src/svc_internal.h @@ -37,6 +37,11 @@ extern int __svc_maxiov; extern int __svc_maxrec; +struct svc_request_params { + SVCXPRT *xprt; + XDR *xdrs; +}; + /* threading fdsets around is annoying */ struct svc_params { mutex_t mtx; @@ -77,6 +82,7 @@ struct svc_params { int32_t idle_timeout; }; +void svc_request_async(struct work_pool_entry *wpe); enum xprt_stat svc_request(SVCXPRT *xprt, XDR *xdrs); extern struct svc_params __svc_params[1]; diff --git a/src/svc_rqst.c b/src/svc_rqst.c index 1264e2233..228754ec0 100644 --- a/src/svc_rqst.c +++ b/src/svc_rqst.c @@ -1219,6 +1219,15 @@ enum xprt_stat svc_request(SVCXPRT *xprt, XDR *xdrs) return stat; } +void svc_request_async(struct work_pool_entry *wpe) +{ + struct svc_request_params *wpe_arg = wpe->arg; + + svc_request(wpe_arg->xprt, wpe_arg->xdrs); + mem_free(wpe->arg, sizeof(struct svc_request_params)); + mem_free(wpe, sizeof(struct work_pool_entry)); +} + static void svc_resume_task(struct work_pool_entry *wpe) { struct svc_req *req = diff --git a/src/svc_vc.c b/src/svc_vc.c index bec4b6e38..78e09904f 100644 --- a/src/svc_vc.c +++ b/src/svc_vc.c @@ -661,6 +661,73 @@ svc_vc_stat(SVCXPRT *xprt) return (XPRT_IDLE); } +int svc_xprt_recv(SVCXPRT *xprt, char *dst, int len, int recv_flags) +{ + int rlen; + int err; + int loglevel = (recv_flags == MSG_WAITALL) ? + TIRPC_DEBUG_FLAG_WARN : TIRPC_DEBUG_FLAG_SVC_VC; + + rlen = recv(xprt->xp_fd, dst, len, recv_flags); + if (unlikely(rlen < 0)) { + err = errno; + + if (err == EAGAIN || err == EWOULDBLOCK) { + __warnx(loglevel, + "%s: %p fd %d recv errno %d (try again)", + "svc_vc_wait", xprt, xprt->xp_fd, err); + if (unlikely(svc_rqst_rearm_events(xprt, + SVC_XPRT_FLAG_ADDED_RECV))) { + __warnx(TIRPC_DEBUG_FLAG_ERROR, + "%s: %p fd %d svc_rqst_rearm_events failed (will set dead)", + "svc_vc_wait", + xprt, xprt->xp_fd); + SVC_DESTROY(xprt); + err = EINVAL; + } +#ifdef USE_LTTNG_NTIRPC + tracepoint(xprt, recv_exit, __func__, __LINE__, + xprt, "EAGAIN", err); +#endif /* USE_LTTNG_NTIRPC */ + return rlen; + } + __warnx(TIRPC_DEBUG_FLAG_WARN, + "%s: %p fd %d recv errno %d (will set dead)", + "svc_vc_wait", xprt, xprt->xp_fd, err); + SVC_DESTROY(xprt); +#ifdef USE_LTTNG_NTIRPC + tracepoint(xprt, recv_exit, __func__, __LINE__, + xprt, "ERROR", err); +#endif /* USE_LTTNG_NTIRPC */ + return rlen; + } + + if (unlikely(!rlen)) { + __warnx(TIRPC_DEBUG_FLAG_SVC_VC, + "%s: %p fd %d recv closed (will set dead)", + "svc_vc_wait", xprt, xprt->xp_fd); + SVC_DESTROY(xprt); +#ifdef USE_LTTNG_NTIRPC + tracepoint(xprt, recv_exit, __func__, __LINE__, + xprt, "EMPTY", 0); +#endif /* USE_LTTNG_NTIRPC */ + } + return rlen; +} + +#define CONT_RECV_RPC_SIZE 4096 + +static inline bool cont_recv(SVCXPRT *xprt, int rcv_len, int recv_counter) +{ + bool out = false; + + if (rcv_len > CONT_RECV_RPC_SIZE && + (recv_counter < __ntirpc_pkg_params.cont_recv_limit) && + !(xprt->xp_flags & (SVC_XPRT_FLAG_DESTROYED))) + out = true; + return out; +} + static enum xprt_stat svc_vc_recv(SVCXPRT *xprt) { @@ -669,14 +736,18 @@ svc_vc_recv(SVCXPRT *xprt) struct poolq_entry *have; struct xdr_ioq_uv *uv; struct xdr_ioq *xioq; + struct work_pool_entry *wpe; + struct svc_request_params *wpe_arg; + int recv_counter = 0, trlen; ssize_t rlen; u_int flags; - int code; #ifdef USE_LTTNG_NTIRPC tracepoint(xprt, funcin, __func__, __LINE__, xprt); #endif /* USE_LTTNG_NTIRPC */ +restart: + trlen = 0; /* no need for locking, only one svc_rqst_xprt_task() per event. * depends upon svc_rqst_rearm_events() for ordering. */ @@ -691,55 +762,30 @@ svc_vc_recv(SVCXPRT *xprt) } if (!xd->sx_fbtbc) { - rlen = recv(xprt->xp_fd, &xd->sx_fbtbc, BYTES_PER_XDR_UNIT, - MSG_WAITALL); - - if (unlikely(rlen < 0)) { - code = errno; - - if (code == EAGAIN || code == EWOULDBLOCK) { - __warnx(TIRPC_DEBUG_FLAG_WARN, - "%s: %p fd %d recv errno %d (try again)", - "svc_vc_wait", xprt, xprt->xp_fd, code); - if (unlikely(svc_rqst_rearm_events( - xprt, - SVC_XPRT_FLAG_ADDED_RECV))) { - __warnx(TIRPC_DEBUG_FLAG_ERROR, - "%s: %p fd %d svc_rqst_rearm_events failed (will set dead)", - "svc_vc_wait", - xprt, xprt->xp_fd); - SVC_DESTROY(xprt); - code = EINVAL; - } -#ifdef USE_LTTNG_NTIRPC - tracepoint(xprt, recv_exit, __func__, __LINE__, - xprt, "EAGAIN", code); -#endif /* USE_LTTNG_NTIRPC */ - return SVC_STAT(xprt); + if (recv_counter) { + /* If this is not the first recv call, we cannot use + * MSG_WAITALL as the current thread will block if + * the data is not available on the socket. The below + * while loop handles the pathalogical case when less + * than BYTES_PER_XDR_UNIT bytes of data is read in + * one recv call. + */ + while (trlen < BYTES_PER_XDR_UNIT) { + rlen = svc_xprt_recv(xprt, + (char *)&xd->sx_fbtbc + trlen, + BYTES_PER_XDR_UNIT - trlen, + MSG_DONTWAIT); + if (rlen <= 0) + break; + trlen += rlen; } - __warnx(TIRPC_DEBUG_FLAG_WARN, - "%s: %p fd %d recv errno %d (will set dead)", - "svc_vc_wait", xprt, xprt->xp_fd, code); - SVC_DESTROY(xprt); -#ifdef USE_LTTNG_NTIRPC - tracepoint(xprt, recv_exit, __func__, __LINE__, - xprt, "ERROR", code); -#endif /* USE_LTTNG_NTIRPC */ - return SVC_STAT(xprt); + } else { + rlen = svc_xprt_recv(xprt, (char *)&xd->sx_fbtbc, + BYTES_PER_XDR_UNIT, MSG_WAITALL); } - - if (unlikely(!rlen)) { - __warnx(TIRPC_DEBUG_FLAG_SVC_VC, - "%s: %p fd %d recv closed (will set dead)", - "svc_vc_wait", xprt, xprt->xp_fd); - SVC_DESTROY(xprt); -#ifdef USE_LTTNG_NTIRPC - tracepoint(xprt, recv_exit, __func__, __LINE__, - xprt, "EMPTY", 0); -#endif /* USE_LTTNG_NTIRPC */ + if (rlen <= 0) { return SVC_STAT(xprt); } - xd->sx_fbtbc = (int32_t)ntohl((long)xd->sx_fbtbc); flags = UIO_FLAG_FREE | UIO_FLAG_MORE; @@ -773,53 +819,12 @@ svc_vc_recv(SVCXPRT *xprt) flags = uv->u.uio_flags; } - rlen = recv(xprt->xp_fd, uv->v.vio_tail, xd->sx_fbtbc, MSG_DONTWAIT); + rlen = svc_xprt_recv(xprt, (char *)uv->v.vio_tail, + xd->sx_fbtbc, MSG_DONTWAIT); - if (unlikely(rlen < 0)) { - code = errno; - - if (code == EAGAIN || code == EWOULDBLOCK) { - __warnx(TIRPC_DEBUG_FLAG_SVC_VC, - "%s: %p fd %d recv errno %d (try again)", - __func__, xprt, xprt->xp_fd, code); - if (unlikely(svc_rqst_rearm_events( - xprt, - SVC_XPRT_FLAG_ADDED_RECV))) { - __warnx(TIRPC_DEBUG_FLAG_ERROR, - "%s: %p fd %d svc_rqst_rearm_events failed (will set dead)", - __func__, xprt, xprt->xp_fd); - SVC_DESTROY(xprt); - code = EINVAL; - } -#ifdef USE_LTTNG_NTIRPC - tracepoint(xprt, recv_exit, __func__, __LINE__, - xprt, "EAGAIN", code); -#endif /* USE_LTTNG_NTIRPC */ - return SVC_STAT(xprt); - } - __warnx(TIRPC_DEBUG_FLAG_ERROR, - "%s: %p fd %d recv errno %d (will set dead)", - __func__, xprt, xprt->xp_fd, code); - SVC_DESTROY(xprt); -#ifdef USE_LTTNG_NTIRPC - tracepoint(xprt, recv_exit, __func__, __LINE__, - xprt, "ERROR", code); -#endif /* USE_LTTNG_NTIRPC */ + if (rlen <= 0) { return SVC_STAT(xprt); } - - if (unlikely(!rlen)) { - __warnx(TIRPC_DEBUG_FLAG_SVC_VC, - "%s: %p fd %d recv closed (will set dead)", - __func__, xprt, xprt->xp_fd); - SVC_DESTROY(xprt); -#ifdef USE_LTTNG_NTIRPC - tracepoint(xprt, recv_exit, __func__, __LINE__, - xprt, "EMPTY", 0); -#endif /* USE_LTTNG_NTIRPC */ - return SVC_STAT(xprt); - } - #ifdef USE_LTTNG_NTIRPC tracepoint(xprt, recv_bytes, __func__, __LINE__, xprt, xd->sx_fbtbc, rlen); @@ -833,6 +838,9 @@ svc_vc_recv(SVCXPRT *xprt) __func__, xprt, xprt->xp_fd, rlen, xd->sx_fbtbc, flags); if (xd->sx_fbtbc || (flags & UIO_FLAG_MORE)) { + if (cont_recv(xprt, rlen + xd->sx_fbtbc, recv_counter++)) { + goto restart; + } if (unlikely(svc_rqst_rearm_events(xprt, SVC_XPRT_FLAG_ADDED_RECV))) { __warnx(TIRPC_DEBUG_FLAG_ERROR, @@ -857,6 +865,21 @@ svc_vc_recv(SVCXPRT *xprt) TAILQ_REMOVE(&rec->ioq.ioq_uv.uvqh.qh, &xioq->ioq_s, q); xdr_ioq_reset(xioq, 0); + if (cont_recv(xprt, rlen, recv_counter++)) { + /* Submit the request to a new thread while the current + * thread will continue reading from the socket without + * adding to epoll fd. + */ + SVC_REF(xprt, SVC_REF_FLAG_NONE); + wpe = mem_alloc(sizeof(struct work_pool_entry)); + wpe_arg = mem_alloc(sizeof(struct svc_request_params)); + wpe_arg->xprt = xprt; + wpe_arg->xdrs = xioq->xdrs; + wpe->fun = svc_request_async; + wpe->arg = wpe_arg; + work_pool_submit(&svc_work_pool, wpe); + goto restart; + } if (unlikely(svc_rqst_rearm_events(xprt, SVC_XPRT_FLAG_ADDED_RECV))) { __warnx(TIRPC_DEBUG_FLAG_ERROR, "%s: %p fd %d svc_rqst_rearm_events failed (will set dead)",