From 08fbd2171003016b5594b78608f7a7ba1c1ffe4e Mon Sep 17 00:00:00 2001 From: "Alan T. DeKok" Date: Mon, 18 Nov 2024 13:48:40 -0500 Subject: [PATCH] update the handling of EOF one BIO can tell fr_bio_eof() that it's at EOF. That function will take care of calling the various BIO internal EOF functions until such time as it's at the first BIO. At which point it will call the application EOF function. --- src/lib/bio/base.c | 48 ++++++++++++++++++++++++++++++------------ src/lib/bio/bio_priv.h | 2 +- src/lib/bio/fd.c | 13 ++++++------ src/lib/bio/fd_errno.h | 1 - src/lib/bio/mem.c | 18 ++++++++++++++-- src/lib/bio/pipe.c | 19 +++++++++++++---- 6 files changed, 72 insertions(+), 29 deletions(-) diff --git a/src/lib/bio/base.c b/src/lib/bio/base.c index ecabbdde0fe8..8968ae99942e 100644 --- a/src/lib/bio/base.c +++ b/src/lib/bio/base.c @@ -231,28 +231,48 @@ void fr_bio_cb_set(fr_bio_t *bio, fr_bio_cb_funcs_t const *cb) /** Internal BIO function to run EOF callbacks. * - * The EOF callbacks are _internal_, and tell the various BIOs that there is nothing more to read from the - * BIO. - * - * @todo - do we need to have separate _write_ EOF? Likely not. + * When a BIO hits EOF, it MUST call this function. This function will take care of changing the read() + * function to return nothing. It will also take care of walking back up the hierarchy, and calling any */ void fr_bio_eof(fr_bio_t *bio) { - fr_bio_t *x = bio; + fr_bio_common_t *this = (fr_bio_common_t *) bio; /* - * Start from the first BIO. + * This BIO is at EOF. So we can't call read() any more. */ - while (fr_bio_prev(x) != NULL) x = fr_bio_prev(x); + this->bio.read = fr_bio_null_read; - /* - * Shut each one down, including the one which called us. - */ - while (x) { - fr_bio_common_t *this = (fr_bio_common_t *) x; + while (true) { + fr_bio_common_t *prev = (fr_bio_common_t *) fr_bio_prev(&this->bio); + + /* + * There are no more BIOs. Tell the application that the entire BIO chain is at EOF. + */ + if (!prev) { + if (this->cb.eof) { + prev->cb.eof(&this->bio); + prev->cb.eof = NULL; + } + break; + } - if (this->priv_cb.eof) this->priv_cb.eof((fr_bio_t *) this); + /* + * Go to the previous BIO. If it doesn't have an EOF handler, then keep going back up + * the chain until we're at the top. + */ + this = prev; + if (!this->priv_cb.eof) continue; - x = fr_bio_next(x); + /* + * The EOF handler said it's NOT at EOF, so we stop processing here. + */ + if (this->priv_cb.eof((fr_bio_t *) this) == 0) break; + + /* + * Don't run the EOF callback multiple times. + */ + this->priv_cb.eof = NULL; + break; } } diff --git a/src/lib/bio/bio_priv.h b/src/lib/bio/bio_priv.h index 7a4a67f39569..48fc0af61fb2 100644 --- a/src/lib/bio/bio_priv.h +++ b/src/lib/bio/bio_priv.h @@ -36,7 +36,7 @@ typedef struct fr_bio_common_s fr_bio_common_t; typedef struct { fr_bio_io_t connected; fr_bio_callback_t shutdown; - fr_bio_callback_t eof; + fr_bio_io_t eof; fr_bio_callback_t failed; fr_bio_io_t read_blocked; diff --git a/src/lib/bio/fd.c b/src/lib/bio/fd.c index d6f9c4c22cf4..d0b41811de50 100644 --- a/src/lib/bio/fd.c +++ b/src/lib/bio/fd.c @@ -100,8 +100,6 @@ static int fr_bio_fd_destructor(fr_bio_fd_t *my) fr_assert(!fr_bio_prev(&my->bio)); fr_assert(!fr_bio_next(&my->bio)); - if (!my->info.eof && my->cb.eof) my->cb.eof(&my->bio); - if (my->connect.ev) { talloc_const_free(my->connect.ev); my->connect.ev = NULL; @@ -117,13 +115,15 @@ static int fr_bio_fd_destructor(fr_bio_fd_t *my) return fr_bio_fd_close(&my->bio); } -static void fr_bio_fd_eof(fr_bio_t *bio) +static int fr_bio_fd_eof(fr_bio_t *bio) { - fr_bio_fd_t *my = talloc_get_type_abort(bio, fr_bio_fd_t); - bio->read = fr_bio_null_read; bio->write = fr_bio_null_write; - my->info.eof = true; + + /* + * Nothing more for us to do, tell fr_bio_eof() that it can continue with poking other BIOs. + */ + return 1; } static int fr_bio_fd_write_resume(fr_bio_t *bio) @@ -150,7 +150,6 @@ static ssize_t fr_bio_fd_read_stream(fr_bio_t *bio, UNUSED void *packet_ctx, voi rcode = read(my->info.socket.fd, buffer, size); if (rcode == 0) { fr_bio_eof(bio); - if (my->cb.eof) my->cb.eof(&my->bio); /* inform the application that we're at EOF */ return 0; } diff --git a/src/lib/bio/fd_errno.h b/src/lib/bio/fd_errno.h index 7f58e0eb499c..df2f42a08102 100644 --- a/src/lib/bio/fd_errno.h +++ b/src/lib/bio/fd_errno.h @@ -44,7 +44,6 @@ case EPIPE: * and set EOF on the BIO. */ fr_bio_eof(&my->bio); - if (my->cb.eof) my->cb.eof(&my->bio); /* inform the application that we're at EOF */ return 0; default: diff --git a/src/lib/bio/mem.c b/src/lib/bio/mem.c index 8d7f8b0cf40d..59a8d3db55ca 100644 --- a/src/lib/bio/mem.c +++ b/src/lib/bio/mem.c @@ -59,7 +59,12 @@ static ssize_t fr_bio_mem_read_eof(fr_bio_t *bio, UNUSED void *packet_ctx, void * No more data: return EOF from now on. */ if (fr_bio_buf_used(&my->read_buffer) == 0) { - my->bio.read = fr_bio_null_read; + + /* + * Don't call our EOF function. But do tell the other BIOs that we're at EOF. + */ + my->priv_cb.eof = NULL; + fr_bio_eof(&my->bio); return 0; } @@ -69,11 +74,20 @@ static ssize_t fr_bio_mem_read_eof(fr_bio_t *bio, UNUSED void *packet_ctx, void return fr_bio_buf_read(&my->read_buffer, buffer, size); } -static void fr_bio_mem_eof(fr_bio_t *bio) +static int fr_bio_mem_eof(fr_bio_t *bio) { fr_bio_mem_t *my = talloc_get_type_abort(bio, fr_bio_mem_t); + /* + * Nothing more for us to read, tell fr_bio_eof() that it can continue with poking other BIOs. + */ + if (fr_bio_buf_used(&my->read_buffer) == 0) { + return 1; + } + my->bio.read = fr_bio_mem_read_eof; + + return 0; } /** Read from a memory BIO diff --git a/src/lib/bio/pipe.c b/src/lib/bio/pipe.c index 8519c6f97b17..a4cf06eec502 100644 --- a/src/lib/bio/pipe.c +++ b/src/lib/bio/pipe.c @@ -66,8 +66,14 @@ static ssize_t fr_bio_pipe_read(fr_bio_t *bio, void *packet_ctx, void *buffer, s pthread_mutex_lock(&my->mutex); rcode = my->next->read(my->next, packet_ctx, buffer, size); if ((rcode == 0) && my->eof) { - rcode = 0; - my->bio.read = fr_bio_null_read; + pthread_mutex_unlock(&my->mutex); + + /* + * Don't call our EOF function. But do tell the other BIOs that we're at EOF. + */ + my->priv_cb.eof = NULL; + fr_bio_eof(bio); + return 0; } else if (rcode > 0) { /* @@ -134,14 +140,19 @@ static void fr_bio_pipe_shutdown(fr_bio_t *bio) * Either side can set EOF, in which case pending reads are still processed. Writes return EOF immediately. * Readers return pending data, and then EOF. */ -static void fr_bio_pipe_eof(fr_bio_t *bio) +static int fr_bio_pipe_eof(fr_bio_t *bio) { fr_bio_pipe_t *my = talloc_get_type_abort(bio, fr_bio_pipe_t); pthread_mutex_lock(&my->mutex); my->eof = true; - fr_bio_eof(my->next); pthread_mutex_unlock(&my->mutex); + + /* + * We don't know if the other end is at EOF, we have to do a read. So we tell fr_bio_eof() to + * stop processing. + */ + return 0; } /** Allocate a thread-safe pipe which can be used for both reads and writes.