Skip to content

Commit

Permalink
update write_blocked() API
Browse files Browse the repository at this point in the history
to notify previous BIOs that subsequent ones are blocked.
  • Loading branch information
alandekok committed Nov 19, 2024
1 parent 81c9bab commit 9a22365
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 52 deletions.
49 changes: 49 additions & 0 deletions src/lib/bio/base.c
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ void fr_bio_cb_set(fr_bio_t *bio, fr_bio_cb_funcs_t const *cb)
*
* When a BIO hits EOF, it MUST call this function. This function will take care of changing the read()
* function to return nothing. It will also take care of walking back up the hierarchy, and calling any
* BIO EOF callbacks.
*
* Once all of the BIOs have been marked as blocked, it will call the application EOF callback.
*/
void fr_bio_eof(fr_bio_t *bio)
{
Expand Down Expand Up @@ -275,3 +278,49 @@ void fr_bio_eof(fr_bio_t *bio)
this->priv_cb.eof = NULL;
}
}

/** Internal BIO function to tell all BIOs that it's blocked.
*
* When a BIO blocks on write, it MUST call this function. This function will take care of walking back up
* the hierarchy, and calling any write_blocked callbacks.
*
* Once all of the BIOs have been marked as blocked, it will call the application write_blocked callback.
*/
int fr_bio_write_blocked(fr_bio_t *bio)
{
fr_bio_common_t *this = (fr_bio_common_t *) bio;
int is_blocked = 1;

while (true) {
fr_bio_common_t *prev = (fr_bio_common_t *) fr_bio_prev(&this->bio);
int rcode;

/*
* There are no more BIOs. Tell the application that the entire BIO chain is blocked.
*/
if (!prev) {
if (this->cb.write_blocked) {
rcode = this->cb.write_blocked(&this->bio);
if (rcode < 0) return rcode;
is_blocked &= (rcode == 1);
}
break;
}

/*
* Go to the previous BIO. If it doesn't have a write_blocked handler, then keep going
* back up the chain until we're at the top.
*/
this = prev;
if (!this->priv_cb.write_blocked) continue;

/*
* The EOF handler said it's NOT at EOF, so we stop processing here.
*/
rcode = this->priv_cb.write_blocked((fr_bio_t *) this);
if (rcode < 0) return rcode;
is_blocked &= (rcode == 1);
}

return is_blocked;
}
2 changes: 1 addition & 1 deletion src/lib/bio/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ typedef struct {
fr_bio_callback_t failed; //!< called when the BIO fails

fr_bio_io_t read_blocked;
fr_bio_io_t write_blocked;
fr_bio_io_t write_blocked; //!< returns 0 for "couldn't block", 1 for "did block".

fr_bio_io_t read_resume; //!< "unblocked" is too similar to "blocked"
fr_bio_io_t write_resume;
Expand Down
2 changes: 2 additions & 0 deletions src/lib/bio/bio_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,5 @@ static inline void CC_HINT(nonnull) fr_bio_unchain(fr_bio_t *bio)
}

void fr_bio_eof(fr_bio_t *bio) CC_HINT(nonnull);

int fr_bio_write_blocked(fr_bio_t *bio) CC_HINT(nonnull);
4 changes: 2 additions & 2 deletions src/lib/bio/fd_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ if (rcode > 0) {
fr_assert((size_t) rcode < size);

/*
* Set the flag and run the callback.
* Set the flag, and tell the other BIOs that we're blocked.
*/
my->info.write_blocked = true;

if (my->cb.write_blocked) {
error = my->cb.write_blocked((fr_bio_t *) my);
error = fr_bio_write_blocked((fr_bio_t *) my);
if (error < 0) return error;
}

Expand Down
33 changes: 30 additions & 3 deletions src/lib/bio/mem.c
Original file line number Diff line number Diff line change
Expand Up @@ -544,16 +544,43 @@ static ssize_t fr_bio_mem_write_buffer(fr_bio_t *bio, UNUSED void *packet_ctx, v
room = fr_bio_buf_write_room(&my->write_buffer);

/*
* The buffer is full. We're now blocked.
* The buffer is full, we can't write anything.
*/
if (!room) return fr_bio_error(IO_WOULD_BLOCK);

if (room < size) size = room;
/*
* If we're asked to write more bytes than are available in the buffer, then tell the caller that
* writes are now blocked, and we can't write any more data.
*
* Return an WOULD_BLOCK error instead of breaking our promise by writing part of the data,
* instead of accepting a full application write.
*/
if (room < size) {
int rcode;

rcode = fr_bio_write_blocked(bio);
if (rcode < 0) return rcode;

return fr_bio_error(IO_WOULD_BLOCK);
}

/*
* As we have clamped the write, we know that this call must succeed.
*/
return fr_bio_buf_write(&my->write_buffer, buffer, size);
(void) fr_bio_buf_write(&my->write_buffer, buffer, size);

/*
* If we've filled the buffer, tell the caller that writes are now blocked, and we can't write
* any more data. However, we still return the amount of data we wrote.
*/
if (room == size) {
int rcode;

rcode = fr_bio_write_blocked(bio);
if (rcode < 0) return rcode;
}

return size;
}

/** Peek at the data in the read buffer
Expand Down
46 changes: 18 additions & 28 deletions src/lib/bio/packet.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@

/** Inform all of the BIOs that the write is blocked.
*
* This function should be set as the application-layer "write_blocked" callback for all BIOs created as part
* This function should be set as the BIO layer "write_blocked" callback for all BIOs created as part
* of a #fr_bio_packet_t. The application should also set bio->uctx=bio_packet for all BIOs.
*/
int fr_bio_packet_write_blocked(fr_bio_t *bio)
static int fr_bio_packet_write_blocked(fr_bio_t *bio)
{
fr_bio_packet_t *my = bio->uctx;
fr_bio_t *next;

/*
* This function must be callable multiple times, as different portions of the BIOs can block at
Expand All @@ -42,24 +41,6 @@ int fr_bio_packet_write_blocked(fr_bio_t *bio)
if (my->write_blocked) return 1;
my->write_blocked = true;

/*
* Inform each underlying BIO that it is blocked. Note that we might be called from a
* lower-layer BIO, so we have to start from the top of the chain.
*
* Note that if the callback returns 0, saying "I couldn't block", then we _still_ mark the
* overall BIO as blocked.
*/
for (next = my->bio;
next != NULL;
next = fr_bio_next(next)) {
int rcode;

if (!((fr_bio_common_t *) next)->priv_cb.write_blocked) continue;

rcode = ((fr_bio_common_t *) next)->priv_cb.write_blocked(next);
if (rcode < 0) return rcode;
}

/*
* The application doesn't want to know that it's blocked, so we just return.
*/
Expand All @@ -71,7 +52,7 @@ int fr_bio_packet_write_blocked(fr_bio_t *bio)
return my->cb.write_blocked(my);
}

int fr_bio_packet_write_resume(fr_bio_t *bio)
static int fr_bio_packet_write_resume(fr_bio_t *bio)
{
fr_bio_packet_t *my = bio->uctx;
fr_bio_t *next;
Expand Down Expand Up @@ -110,7 +91,7 @@ int fr_bio_packet_write_resume(fr_bio_t *bio)
return rcode;
}

int fr_bio_packet_read_blocked(fr_bio_t *bio)
static int fr_bio_packet_read_blocked(fr_bio_t *bio)
{
fr_bio_packet_t *my = bio->uctx;

Expand All @@ -119,7 +100,7 @@ int fr_bio_packet_read_blocked(fr_bio_t *bio)
return my->cb.read_blocked(my);
}

int fr_bio_packet_read_resume(fr_bio_t *bio)
static int fr_bio_packet_read_resume(fr_bio_t *bio)
{
fr_bio_packet_t *my = bio->uctx;

Expand Down Expand Up @@ -168,19 +149,28 @@ void fr_bio_packet_connected(fr_bio_t *bio)
my->cb.connected(my);
}

static void fr_bio_packet_shutdown(UNUSED fr_bio_t *bio)
static void fr_bio_packet_shutdown(fr_bio_t *bio)
{
fr_bio_packet_t *my = bio->uctx;

if (my->cb.shutdown) my->cb.shutdown(my);
my->cb.shutdown = NULL;
}

static void fr_bio_packet_eof(fr_bio_t *bio)
{
fr_bio_packet_t *my = bio->uctx;

if (my->cb.eof) my->cb.eof(my);
my->cb.eof = NULL;
}

static void fr_bio_packet_failed(UNUSED fr_bio_t *bio)
static void fr_bio_packet_failed(fr_bio_t *bio)
{
fr_bio_packet_t *my = bio->uctx;

if (my->cb.failed) my->cb.failed(my);
my->cb.failed = NULL;
}


Expand All @@ -201,9 +191,9 @@ void fr_bio_packet_init(fr_bio_packet_t *my)
};

/*
* Every participating BIO has us, and our callbacks set as the application-layer.
* Every participating BIO has us set as the bio->uctx, and we handle all BIO callbacks.
*
* The application sets itself as my->uctx and as our callbacks.
* The application sets its own pointer my->uctx and sets itself via our callbacks.
*/
while (bio) {
bio->uctx = my;
Expand Down
4 changes: 0 additions & 4 deletions src/lib/bio/packet.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,5 @@ static inline CC_HINT(nonnull) int fr_bio_packet_write_flush(fr_bio_packet_t *my

void fr_bio_packet_connected(fr_bio_t *bio) CC_HINT(nonnull);
int fr_bio_packet_connect(fr_bio_t *bio) CC_HINT(nonnull);
int fr_bio_packet_write_blocked(fr_bio_t *bio) CC_HINT(nonnull);
int fr_bio_packet_write_resume(fr_bio_t *bio) CC_HINT(nonnull);
int fr_bio_packet_read_blocked(fr_bio_t *bio) CC_HINT(nonnull);
int fr_bio_packet_read_resume(fr_bio_t *bio) CC_HINT(nonnull);

void fr_bio_packet_init(fr_bio_packet_t *my) CC_HINT(nonnull);
23 changes: 9 additions & 14 deletions src/lib/bio/retry.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ struct fr_bio_retry_s {
static void fr_bio_retry_timer(UNUSED fr_event_list_t *el, fr_time_t now, void *uctx);
static void fr_bio_retry_expiry_timer(UNUSED fr_event_list_t *el, fr_time_t now, void *uctx);
static ssize_t fr_bio_retry_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size);
static ssize_t fr_bio_retry_blocked(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, ssize_t rcode);
static ssize_t fr_bio_retry_save_write(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, ssize_t rcode);

#define fr_bio_retry_timer_clear(_x) do { \
talloc_const_free((_x)->ev); \
Expand Down Expand Up @@ -352,7 +352,7 @@ static int fr_bio_retry_write_item(fr_bio_retry_t *my, fr_bio_retry_entry_t *ite
* We didn't write the whole packet, we're blocked.
*/
if ((size_t) rcode < item->size) {
if (fr_bio_retry_blocked(my, item, rcode) < 0) return fr_bio_error(GENERIC); /* oom */
if (fr_bio_retry_save_write(my, item, rcode) < 0) return fr_bio_error(GENERIC); /* oom */

return 0;
}
Expand Down Expand Up @@ -498,14 +498,9 @@ static ssize_t fr_bio_retry_write_partial(fr_bio_t *bio, void *packet_ctx, const
return fr_bio_retry_write(bio, packet_ctx, buffer, size);
}

/** The write is blocked.
*
* We couldn't write out the entire packet, the bio is blocked. Don't write anything else until we become
* unblocked!
*
* And free the timer. There's no point in trying to write things if the socket is blocked.
/** Save a partial packet when the write becomes blocked.
*/
static ssize_t fr_bio_retry_blocked(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, ssize_t rcode)
static ssize_t fr_bio_retry_save_write(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, ssize_t rcode)
{
fr_assert(!my->partial);
fr_assert(rcode > 0);
Expand All @@ -525,12 +520,12 @@ static ssize_t fr_bio_retry_blocked(fr_bio_retry_t *my, fr_bio_retry_entry_t *it
fr_bio_buf_write(&my->buffer, item->buffer + rcode, item->size - rcode);

my->partial = item;
my->info.write_blocked = true;

/*
* There's no timer, as the write is blocked, so we can't retry.
* If the "next" BIO blocked, then the call to fr_bio_write_blocked() will have already called
* this function.
*/
fr_bio_retry_timer_clear(my);
if (fr_bio_retry_write_blocked(&my->bio) < 0) return fr_bio_error(GENERIC);

my->bio.write = fr_bio_retry_write_partial;

Expand Down Expand Up @@ -612,7 +607,7 @@ ssize_t fr_bio_retry_rewrite(fr_bio_t *bio, fr_bio_retry_entry_t *item, const vo
/*
* We had previously written the packet, so save the re-sent one, too.
*/
return fr_bio_retry_blocked(my, item, rcode);
return fr_bio_retry_save_write(my, item, rcode);
}

/** A previous timer write had a fatal error, so we forbid further writes.
Expand Down Expand Up @@ -830,7 +825,7 @@ static ssize_t fr_bio_retry_write(fr_bio_t *bio, void *packet_ctx, void const *b
* We only wrote part of the packet, remember to write the rest of it.
*/
if ((size_t) rcode < size) {
return fr_bio_retry_blocked(my, item, rcode);
return fr_bio_retry_save_write(my, item, rcode);
}

/*
Expand Down

0 comments on commit 9a22365

Please sign in to comment.