diff --git a/src/lib/bio/base.c b/src/lib/bio/base.c index ecabbdde0fe8d..007c578cb1324 100644 --- a/src/lib/bio/base.c +++ b/src/lib/bio/base.c @@ -231,28 +231,48 @@ void fr_bio_cb_set(fr_bio_t *bio, fr_bio_cb_funcs_t const *cb) /** Internal BIO function to run EOF callbacks. * - * The EOF callbacks are _internal_, and tell the various BIOs that there is nothing more to read from the - * BIO. - * - * @todo - do we need to have separate _write_ EOF? Likely not. + * When a BIO hits EOF, it MUST call this function. This function will take care of changing the read() + * function to return nothing. It will also take care of walking back up the hierarchy, and calling any */ void fr_bio_eof(fr_bio_t *bio) { - fr_bio_t *x = bio; + fr_bio_common_t *this = (fr_bio_common_t *) bio; /* - * Start from the first BIO. + * This BIO is at EOF. So we can't call read() any more. */ - while (fr_bio_prev(x) != NULL) x = fr_bio_prev(x); + this->bio.read = fr_bio_null_read; - /* - * Shut each one down, including the one which called us. - */ - while (x) { - fr_bio_common_t *this = (fr_bio_common_t *) x; + while (true) { + fr_bio_common_t *prev = (fr_bio_common_t *) fr_bio_prev(&this->bio); + + /* + * There are no more BIOs. Tell the application that the entire BIO chain is at EOF. + */ + if (!prev) { + if (this->cb.eof) { + this->cb.eof(&this->bio); + this->cb.eof = NULL; + } + break; + } - if (this->priv_cb.eof) this->priv_cb.eof((fr_bio_t *) this); + /* + * Go to the previous BIO. If it doesn't have an EOF handler, then keep going back up + * the chain until we're at the top. + */ + this = prev; + if (!this->priv_cb.eof) continue; - x = fr_bio_next(x); + /* + * The EOF handler said it's NOT at EOF, so we stop processing here. + */ + if (this->priv_cb.eof((fr_bio_t *) this) == 0) break; + + /* + * Don't run the EOF callback multiple times. + */ + this->priv_cb.eof = NULL; + break; } } diff --git a/src/lib/bio/bio_priv.h b/src/lib/bio/bio_priv.h index 7a4a67f395692..48fc0af61fb2f 100644 --- a/src/lib/bio/bio_priv.h +++ b/src/lib/bio/bio_priv.h @@ -36,7 +36,7 @@ typedef struct fr_bio_common_s fr_bio_common_t; typedef struct { fr_bio_io_t connected; fr_bio_callback_t shutdown; - fr_bio_callback_t eof; + fr_bio_io_t eof; fr_bio_callback_t failed; fr_bio_io_t read_blocked; diff --git a/src/lib/bio/fd.c b/src/lib/bio/fd.c index d6f9c4c22cf42..d0b41811de504 100644 --- a/src/lib/bio/fd.c +++ b/src/lib/bio/fd.c @@ -100,8 +100,6 @@ static int fr_bio_fd_destructor(fr_bio_fd_t *my) fr_assert(!fr_bio_prev(&my->bio)); fr_assert(!fr_bio_next(&my->bio)); - if (!my->info.eof && my->cb.eof) my->cb.eof(&my->bio); - if (my->connect.ev) { talloc_const_free(my->connect.ev); my->connect.ev = NULL; @@ -117,13 +115,15 @@ static int fr_bio_fd_destructor(fr_bio_fd_t *my) return fr_bio_fd_close(&my->bio); } -static void fr_bio_fd_eof(fr_bio_t *bio) +static int fr_bio_fd_eof(fr_bio_t *bio) { - fr_bio_fd_t *my = talloc_get_type_abort(bio, fr_bio_fd_t); - bio->read = fr_bio_null_read; bio->write = fr_bio_null_write; - my->info.eof = true; + + /* + * Nothing more for us to do, tell fr_bio_eof() that it can continue with poking other BIOs. + */ + return 1; } static int fr_bio_fd_write_resume(fr_bio_t *bio) @@ -150,7 +150,6 @@ static ssize_t fr_bio_fd_read_stream(fr_bio_t *bio, UNUSED void *packet_ctx, voi rcode = read(my->info.socket.fd, buffer, size); if (rcode == 0) { fr_bio_eof(bio); - if (my->cb.eof) my->cb.eof(&my->bio); /* inform the application that we're at EOF */ return 0; } diff --git a/src/lib/bio/fd_errno.h b/src/lib/bio/fd_errno.h index 7f58e0eb499ce..df2f42a08102f 100644 --- a/src/lib/bio/fd_errno.h +++ b/src/lib/bio/fd_errno.h @@ -44,7 +44,6 @@ case EPIPE: * and set EOF on the BIO. */ fr_bio_eof(&my->bio); - if (my->cb.eof) my->cb.eof(&my->bio); /* inform the application that we're at EOF */ return 0; default: diff --git a/src/lib/bio/mem.c b/src/lib/bio/mem.c index 8d7f8b0cf40d3..59a8d3db55caf 100644 --- a/src/lib/bio/mem.c +++ b/src/lib/bio/mem.c @@ -59,7 +59,12 @@ static ssize_t fr_bio_mem_read_eof(fr_bio_t *bio, UNUSED void *packet_ctx, void * No more data: return EOF from now on. */ if (fr_bio_buf_used(&my->read_buffer) == 0) { - my->bio.read = fr_bio_null_read; + + /* + * Don't call our EOF function. But do tell the other BIOs that we're at EOF. + */ + my->priv_cb.eof = NULL; + fr_bio_eof(&my->bio); return 0; } @@ -69,11 +74,20 @@ static ssize_t fr_bio_mem_read_eof(fr_bio_t *bio, UNUSED void *packet_ctx, void return fr_bio_buf_read(&my->read_buffer, buffer, size); } -static void fr_bio_mem_eof(fr_bio_t *bio) +static int fr_bio_mem_eof(fr_bio_t *bio) { fr_bio_mem_t *my = talloc_get_type_abort(bio, fr_bio_mem_t); + /* + * Nothing more for us to read, tell fr_bio_eof() that it can continue with poking other BIOs. + */ + if (fr_bio_buf_used(&my->read_buffer) == 0) { + return 1; + } + my->bio.read = fr_bio_mem_read_eof; + + return 0; } /** Read from a memory BIO diff --git a/src/lib/bio/pipe.c b/src/lib/bio/pipe.c index 8519c6f97b179..a4cf06eec5022 100644 --- a/src/lib/bio/pipe.c +++ b/src/lib/bio/pipe.c @@ -66,8 +66,14 @@ static ssize_t fr_bio_pipe_read(fr_bio_t *bio, void *packet_ctx, void *buffer, s pthread_mutex_lock(&my->mutex); rcode = my->next->read(my->next, packet_ctx, buffer, size); if ((rcode == 0) && my->eof) { - rcode = 0; - my->bio.read = fr_bio_null_read; + pthread_mutex_unlock(&my->mutex); + + /* + * Don't call our EOF function. But do tell the other BIOs that we're at EOF. + */ + my->priv_cb.eof = NULL; + fr_bio_eof(bio); + return 0; } else if (rcode > 0) { /* @@ -134,14 +140,19 @@ static void fr_bio_pipe_shutdown(fr_bio_t *bio) * Either side can set EOF, in which case pending reads are still processed. Writes return EOF immediately. * Readers return pending data, and then EOF. */ -static void fr_bio_pipe_eof(fr_bio_t *bio) +static int fr_bio_pipe_eof(fr_bio_t *bio) { fr_bio_pipe_t *my = talloc_get_type_abort(bio, fr_bio_pipe_t); pthread_mutex_lock(&my->mutex); my->eof = true; - fr_bio_eof(my->next); pthread_mutex_unlock(&my->mutex); + + /* + * We don't know if the other end is at EOF, we have to do a read. So we tell fr_bio_eof() to + * stop processing. + */ + return 0; } /** Allocate a thread-safe pipe which can be used for both reads and writes.