From 37e8e7f26b18c370049f6f789392088c26ec41ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20M=C3=BCller?= Date: Thu, 4 Jan 2024 20:23:55 -0800 Subject: [PATCH] Advance message sending by more than one step at a time It does not seem proper to advance the sending of messages as represented by the SendMessageState type with one state transition at a time. Rather, we should advance it as far as possible, until we either cannot continue because an IO is pending, an error occurred, or we are done sending altogether. --- src/wrap.rs | 76 ++++++++++++++++++++++++++--------------------------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/src/wrap.rs b/src/wrap.rs index 6473729..13d1e4c 100644 --- a/src/wrap.rs +++ b/src/wrap.rs @@ -81,51 +81,51 @@ enum SendMessageState { } impl SendMessageState { - /// Attempt to advance the message state by one step. + /// Attempt to advance the message state. fn advance(&mut self, sink: &mut S, ctx: &mut Context<'_>) -> Result<(), S::Error> where S: Sink + Unpin, M: Debug, { - match self { - Self::Unused => Ok(()), - Self::Pending(message) => { - match sink.poll_ready_unpin(ctx) { - Poll::Pending => return Ok(()), - Poll::Ready(Ok(())) => (), - Poll::Ready(Err(err)) => { - *self = Self::Unused; - return Err(err) - }, - } + loop { + match self { + Self::Unused => break Ok(()), + Self::Pending(message) => { + match sink.poll_ready_unpin(ctx) { + Poll::Pending => return Ok(()), + Poll::Ready(Ok(())) => (), + Poll::Ready(Err(err)) => { + *self = Self::Unused; + return Err(err) + }, + } - let message = message.take(); - *self = Self::Unused; - debug!( - channel = debug(sink as *const _), - send_msg = debug(&message) - ); + let message = message.take(); + *self = Self::Unused; + debug!( + channel = debug(sink as *const _), + send_msg = debug(&message) + ); - if let Some(message) = message { - sink.start_send_unpin(message)?; - *self = Self::Flush; - } - Ok(()) - }, - Self::Flush => { - trace!(channel = debug(sink as *const _), msg = "flushing"); - match sink.poll_flush_unpin(ctx) { - Poll::Pending => Ok(()), - Poll::Ready(Ok(())) => { - *self = Self::Unused; - Ok(()) - }, - Poll::Ready(Err(err)) => { - *self = Self::Unused; - Err(err) - }, - } - }, + if let Some(message) = message { + sink.start_send_unpin(message)?; + *self = Self::Flush; + } + }, + Self::Flush => { + trace!(channel = debug(sink as *const _), msg = "flushing"); + match sink.poll_flush_unpin(ctx) { + Poll::Pending => break Ok(()), + Poll::Ready(Ok(())) => { + *self = Self::Unused; + }, + Poll::Ready(Err(err)) => { + *self = Self::Unused; + break Err(err) + }, + } + }, + } } }