Skip to content

Commit

Permalink
Advance message sending by more than one step at a time
Browse files Browse the repository at this point in the history
It does not seem proper to advance the sending of messages as
represented by the SendMessageState type with one state transition at a
time. Rather, we should advance it as far as possible, until we either
cannot continue because an IO is pending, an error occurred, or we are
done sending altogether.
  • Loading branch information
d-e-s-o committed Jan 5, 2024
1 parent bf9390b commit 37e8e7f
Showing 1 changed file with 38 additions and 38 deletions.
76 changes: 38 additions & 38 deletions src/wrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,51 +81,51 @@ enum SendMessageState<M> {
}

impl<M> SendMessageState<M> {
/// Attempt to advance the message state by one step.
/// Attempt to advance the message state.
fn advance<S>(&mut self, sink: &mut S, ctx: &mut Context<'_>) -> Result<(), S::Error>
where
S: Sink<M> + Unpin,
M: Debug,
{
match self {
Self::Unused => Ok(()),
Self::Pending(message) => {
match sink.poll_ready_unpin(ctx) {
Poll::Pending => return Ok(()),
Poll::Ready(Ok(())) => (),
Poll::Ready(Err(err)) => {
*self = Self::Unused;
return Err(err)
},
}
loop {
match self {
Self::Unused => break Ok(()),
Self::Pending(message) => {
match sink.poll_ready_unpin(ctx) {
Poll::Pending => return Ok(()),
Poll::Ready(Ok(())) => (),
Poll::Ready(Err(err)) => {
*self = Self::Unused;
return Err(err)
},
}

let message = message.take();
*self = Self::Unused;
debug!(
channel = debug(sink as *const _),
send_msg = debug(&message)
);
let message = message.take();
*self = Self::Unused;
debug!(
channel = debug(sink as *const _),
send_msg = debug(&message)
);

if let Some(message) = message {
sink.start_send_unpin(message)?;
*self = Self::Flush;
}
Ok(())
},
Self::Flush => {
trace!(channel = debug(sink as *const _), msg = "flushing");
match sink.poll_flush_unpin(ctx) {
Poll::Pending => Ok(()),
Poll::Ready(Ok(())) => {
*self = Self::Unused;
Ok(())
},
Poll::Ready(Err(err)) => {
*self = Self::Unused;
Err(err)
},
}
},
if let Some(message) = message {
sink.start_send_unpin(message)?;
*self = Self::Flush;
}
},
Self::Flush => {
trace!(channel = debug(sink as *const _), msg = "flushing");
match sink.poll_flush_unpin(ctx) {
Poll::Pending => break Ok(()),
Poll::Ready(Ok(())) => {
*self = Self::Unused;
},
Poll::Ready(Err(err)) => {
*self = Self::Unused;
break Err(err)
},
}
},
}
}
}

Expand Down

0 comments on commit 37e8e7f

Please sign in to comment.