diff --git a/src/lib/bio/base.c b/src/lib/bio/base.c index a5efde18d1881..1039e999c3afd 100644 --- a/src/lib/bio/base.c +++ b/src/lib/bio/base.c @@ -233,6 +233,9 @@ void fr_bio_cb_set(fr_bio_t *bio, fr_bio_cb_funcs_t const *cb) * * When a BIO hits EOF, it MUST call this function. This function will take care of changing the read() * function to return nothing. It will also take care of walking back up the hierarchy, and calling any + * BIO EOF callbacks. + * + * Once all of the BIOs have been marked as blocked, it will call the application EOF callback. */ void fr_bio_eof(fr_bio_t *bio) { @@ -275,3 +278,49 @@ void fr_bio_eof(fr_bio_t *bio) this->priv_cb.eof = NULL; } } + +/** Internal BIO function to tell all BIOs that it's blocked. + * + * When a BIO blocks on write, it MUST call this function. This function will take care of walking back up + * the hierarchy, and calling any write_blocked callbacks. + * + * Once all of the BIOs have been marked as blocked, it will call the application write_blocked callback. + */ +int fr_bio_write_blocked(fr_bio_t *bio) +{ + fr_bio_common_t *this = (fr_bio_common_t *) bio; + int is_blocked = 1; + + while (true) { + fr_bio_common_t *prev = (fr_bio_common_t *) fr_bio_prev(&this->bio); + int rcode; + + /* + * There are no more BIOs. Tell the application that the entire BIO chain is blocked. + */ + if (!prev) { + if (this->cb.write_blocked) { + rcode = this->cb.write_blocked(&this->bio); + if (rcode < 0) return rcode; + is_blocked &= (rcode == 1); + } + break; + } + + /* + * Go to the previous BIO. If it doesn't have a write_blocked handler, then keep going + * back up the chain until we're at the top. + */ + this = prev; + if (!this->priv_cb.write_blocked) continue; + + /* + * The EOF handler said it's NOT at EOF, so we stop processing here. + */ + rcode = this->priv_cb.write_blocked((fr_bio_t *) this); + if (rcode < 0) return rcode; + is_blocked &= (rcode == 1); + } + + return is_blocked; +} diff --git a/src/lib/bio/base.h b/src/lib/bio/base.h index 29d5fefd434ec..d6d88389c9a5b 100644 --- a/src/lib/bio/base.h +++ b/src/lib/bio/base.h @@ -90,7 +90,7 @@ typedef struct { fr_bio_callback_t failed; //!< called when the BIO fails fr_bio_io_t read_blocked; - fr_bio_io_t write_blocked; + fr_bio_io_t write_blocked; //!< returns 0 for "couldn't block", 1 for "did block". fr_bio_io_t read_resume; //!< "unblocked" is too similar to "blocked" fr_bio_io_t write_resume; diff --git a/src/lib/bio/bio_priv.h b/src/lib/bio/bio_priv.h index 48fc0af61fb2f..55195b4b9de3f 100644 --- a/src/lib/bio/bio_priv.h +++ b/src/lib/bio/bio_priv.h @@ -93,3 +93,5 @@ static inline void CC_HINT(nonnull) fr_bio_unchain(fr_bio_t *bio) } void fr_bio_eof(fr_bio_t *bio) CC_HINT(nonnull); + +int fr_bio_write_blocked(fr_bio_t *bio) CC_HINT(nonnull); diff --git a/src/lib/bio/fd_write.h b/src/lib/bio/fd_write.h index f34c99df2af10..b4a1b16d18bb7 100644 --- a/src/lib/bio/fd_write.h +++ b/src/lib/bio/fd_write.h @@ -20,12 +20,12 @@ if (rcode > 0) { fr_assert((size_t) rcode < size); /* - * Set the flag and run the callback. + * Set the flag, and tell the other BIOs that we're blocked. */ my->info.write_blocked = true; if (my->cb.write_blocked) { - error = my->cb.write_blocked((fr_bio_t *) my); + error = fr_bio_write_blocked((fr_bio_t *) my); if (error < 0) return error; } diff --git a/src/lib/bio/mem.c b/src/lib/bio/mem.c index 59a8d3db55caf..a93341c5a1f54 100644 --- a/src/lib/bio/mem.c +++ b/src/lib/bio/mem.c @@ -544,16 +544,43 @@ static ssize_t fr_bio_mem_write_buffer(fr_bio_t *bio, UNUSED void *packet_ctx, v room = fr_bio_buf_write_room(&my->write_buffer); /* - * The buffer is full. We're now blocked. + * The buffer is full, we can't write anything. */ if (!room) return fr_bio_error(IO_WOULD_BLOCK); - if (room < size) size = room; + /* + * If we're asked to write more bytes than are available in the buffer, then tell the caller that + * writes are now blocked, and we can't write any more data. + * + * Return an WOULD_BLOCK error instead of breaking our promise by writing part of the data, + * instead of accepting a full application write. + */ + if (room < size) { + int rcode; + + rcode = fr_bio_write_blocked(bio); + if (rcode < 0) return rcode; + + return fr_bio_error(IO_WOULD_BLOCK); + } /* * As we have clamped the write, we know that this call must succeed. */ - return fr_bio_buf_write(&my->write_buffer, buffer, size); + (void) fr_bio_buf_write(&my->write_buffer, buffer, size); + + /* + * If we've filled the buffer, tell the caller that writes are now blocked, and we can't write + * any more data. However, we still return the amount of data we wrote. + */ + if (room == size) { + int rcode; + + rcode = fr_bio_write_blocked(bio); + if (rcode < 0) return rcode; + } + + return size; } /** Peek at the data in the read buffer diff --git a/src/lib/bio/packet.c b/src/lib/bio/packet.c index 9d7cc7534eb0c..bdf72473a940e 100644 --- a/src/lib/bio/packet.c +++ b/src/lib/bio/packet.c @@ -27,13 +27,12 @@ /** Inform all of the BIOs that the write is blocked. * - * This function should be set as the application-layer "write_blocked" callback for all BIOs created as part + * This function should be set as the BIO layer "write_blocked" callback for all BIOs created as part * of a #fr_bio_packet_t. The application should also set bio->uctx=bio_packet for all BIOs. */ -int fr_bio_packet_write_blocked(fr_bio_t *bio) +static int fr_bio_packet_write_blocked(fr_bio_t *bio) { fr_bio_packet_t *my = bio->uctx; - fr_bio_t *next; /* * This function must be callable multiple times, as different portions of the BIOs can block at @@ -42,24 +41,6 @@ int fr_bio_packet_write_blocked(fr_bio_t *bio) if (my->write_blocked) return 1; my->write_blocked = true; - /* - * Inform each underlying BIO that it is blocked. Note that we might be called from a - * lower-layer BIO, so we have to start from the top of the chain. - * - * Note that if the callback returns 0, saying "I couldn't block", then we _still_ mark the - * overall BIO as blocked. - */ - for (next = my->bio; - next != NULL; - next = fr_bio_next(next)) { - int rcode; - - if (!((fr_bio_common_t *) next)->priv_cb.write_blocked) continue; - - rcode = ((fr_bio_common_t *) next)->priv_cb.write_blocked(next); - if (rcode < 0) return rcode; - } - /* * The application doesn't want to know that it's blocked, so we just return. */ @@ -71,7 +52,7 @@ int fr_bio_packet_write_blocked(fr_bio_t *bio) return my->cb.write_blocked(my); } -int fr_bio_packet_write_resume(fr_bio_t *bio) +static int fr_bio_packet_write_resume(fr_bio_t *bio) { fr_bio_packet_t *my = bio->uctx; fr_bio_t *next; @@ -110,7 +91,7 @@ int fr_bio_packet_write_resume(fr_bio_t *bio) return rcode; } -int fr_bio_packet_read_blocked(fr_bio_t *bio) +static int fr_bio_packet_read_blocked(fr_bio_t *bio) { fr_bio_packet_t *my = bio->uctx; @@ -119,7 +100,7 @@ int fr_bio_packet_read_blocked(fr_bio_t *bio) return my->cb.read_blocked(my); } -int fr_bio_packet_read_resume(fr_bio_t *bio) +static int fr_bio_packet_read_resume(fr_bio_t *bio) { fr_bio_packet_t *my = bio->uctx; @@ -168,8 +149,12 @@ void fr_bio_packet_connected(fr_bio_t *bio) my->cb.connected(my); } -static void fr_bio_packet_shutdown(UNUSED fr_bio_t *bio) +static void fr_bio_packet_shutdown(fr_bio_t *bio) { + fr_bio_packet_t *my = bio->uctx; + + if (my->cb.shutdown) my->cb.shutdown(my); + my->cb.shutdown = NULL; } static void fr_bio_packet_eof(fr_bio_t *bio) @@ -177,10 +162,15 @@ static void fr_bio_packet_eof(fr_bio_t *bio) fr_bio_packet_t *my = bio->uctx; if (my->cb.eof) my->cb.eof(my); + my->cb.eof = NULL; } -static void fr_bio_packet_failed(UNUSED fr_bio_t *bio) +static void fr_bio_packet_failed(fr_bio_t *bio) { + fr_bio_packet_t *my = bio->uctx; + + if (my->cb.failed) my->cb.failed(my); + my->cb.failed = NULL; } @@ -201,9 +191,9 @@ void fr_bio_packet_init(fr_bio_packet_t *my) }; /* - * Every participating BIO has us, and our callbacks set as the application-layer. + * Every participating BIO has us set as the bio->uctx, and we handle all BIO callbacks. * - * The application sets itself as my->uctx and as our callbacks. + * The application sets its own pointer my->uctx and sets itself via our callbacks. */ while (bio) { bio->uctx = my; diff --git a/src/lib/bio/packet.h b/src/lib/bio/packet.h index d983993057385..67853fd048f65 100644 --- a/src/lib/bio/packet.h +++ b/src/lib/bio/packet.h @@ -181,9 +181,5 @@ static inline CC_HINT(nonnull) int fr_bio_packet_write_flush(fr_bio_packet_t *my void fr_bio_packet_connected(fr_bio_t *bio) CC_HINT(nonnull); int fr_bio_packet_connect(fr_bio_t *bio) CC_HINT(nonnull); -int fr_bio_packet_write_blocked(fr_bio_t *bio) CC_HINT(nonnull); -int fr_bio_packet_write_resume(fr_bio_t *bio) CC_HINT(nonnull); -int fr_bio_packet_read_blocked(fr_bio_t *bio) CC_HINT(nonnull); -int fr_bio_packet_read_resume(fr_bio_t *bio) CC_HINT(nonnull); void fr_bio_packet_init(fr_bio_packet_t *my) CC_HINT(nonnull); diff --git a/src/lib/bio/retry.c b/src/lib/bio/retry.c index 78a62d2eb20aa..4d6439b8ceb60 100644 --- a/src/lib/bio/retry.c +++ b/src/lib/bio/retry.c @@ -127,7 +127,7 @@ struct fr_bio_retry_s { static void fr_bio_retry_timer(UNUSED fr_event_list_t *el, fr_time_t now, void *uctx); static void fr_bio_retry_expiry_timer(UNUSED fr_event_list_t *el, fr_time_t now, void *uctx); static ssize_t fr_bio_retry_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size); -static ssize_t fr_bio_retry_blocked(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, ssize_t rcode); +static ssize_t fr_bio_retry_save_write(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, ssize_t rcode); #define fr_bio_retry_timer_clear(_x) do { \ talloc_const_free((_x)->ev); \ @@ -352,7 +352,7 @@ static int fr_bio_retry_write_item(fr_bio_retry_t *my, fr_bio_retry_entry_t *ite * We didn't write the whole packet, we're blocked. */ if ((size_t) rcode < item->size) { - if (fr_bio_retry_blocked(my, item, rcode) < 0) return fr_bio_error(GENERIC); /* oom */ + if (fr_bio_retry_save_write(my, item, rcode) < 0) return fr_bio_error(GENERIC); /* oom */ return 0; } @@ -498,14 +498,9 @@ static ssize_t fr_bio_retry_write_partial(fr_bio_t *bio, void *packet_ctx, const return fr_bio_retry_write(bio, packet_ctx, buffer, size); } -/** The write is blocked. - * - * We couldn't write out the entire packet, the bio is blocked. Don't write anything else until we become - * unblocked! - * - * And free the timer. There's no point in trying to write things if the socket is blocked. +/** Save a partial packet when the write becomes blocked. */ -static ssize_t fr_bio_retry_blocked(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, ssize_t rcode) +static ssize_t fr_bio_retry_save_write(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, ssize_t rcode) { fr_assert(!my->partial); fr_assert(rcode > 0); @@ -525,12 +520,12 @@ static ssize_t fr_bio_retry_blocked(fr_bio_retry_t *my, fr_bio_retry_entry_t *it fr_bio_buf_write(&my->buffer, item->buffer + rcode, item->size - rcode); my->partial = item; - my->info.write_blocked = true; /* - * There's no timer, as the write is blocked, so we can't retry. + * If the "next" BIO blocked, then the call to fr_bio_write_blocked() will have already called + * this function. */ - fr_bio_retry_timer_clear(my); + if (fr_bio_retry_write_blocked(&my->bio) < 0) return fr_bio_error(GENERIC); my->bio.write = fr_bio_retry_write_partial; @@ -612,7 +607,7 @@ ssize_t fr_bio_retry_rewrite(fr_bio_t *bio, fr_bio_retry_entry_t *item, const vo /* * We had previously written the packet, so save the re-sent one, too. */ - return fr_bio_retry_blocked(my, item, rcode); + return fr_bio_retry_save_write(my, item, rcode); } /** A previous timer write had a fatal error, so we forbid further writes. @@ -830,7 +825,7 @@ static ssize_t fr_bio_retry_write(fr_bio_t *bio, void *packet_ctx, void const *b * We only wrote part of the packet, remember to write the rest of it. */ if ((size_t) rcode < size) { - return fr_bio_retry_blocked(my, item, rcode); + return fr_bio_retry_save_write(my, item, rcode); } /*