-
Notifications
You must be signed in to change notification settings - Fork 112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ability to close server side of BidiStream from separate goroutine #823
Comments
The handler closes the stream by returning. If you need to close it with an RPC error code, you simply have the handler return a non-nil error. Any concurrent calls to |
FWIW, this library is built on If you really need to close it from a different goroutine, then you would need to install a trampoline as the RPC handler -- the trampoline just spawns another goroutine to perform the RPC logic and then waits for either it to exit or some other error signal (this can be arranged with channels). This would allow you to signal it to exit with an error from another goroutine, via sending something on a channel. Below is an example of something like this. (Note that I just improv'ed all of this and haven't actually verified that it works. But it should at least be a useful outline for how to do this. It would need tests to fully flesh it out.) // WithCloseableBidiStream returns a standard bidi-stream RPC handler function that
// adapts the call so that the given action can close the stream without returning. Note
// that this could result in extra resource-usage or even resource leaks, if the given
// action fails to terminate promptly after the stream is closed.
func WithCloseableBidiStream[Req, Resp any](
action func(context.Context, *CloseableBidiStream[Req,Resp]) error,
) func(context.Context, *connect.BidiStream[Req,Resp]) error {
return func(ctx context.Context, stream *connect.BidiStream[Req,Resp]) error {
ch := make(chan error, 1)
closeableStream := &CloseableBidiStream[Req,Resp]{BidiStream: stream, closeChan: ch}
go func() {
defer func() {
if val := recover(); val != nil {
_ = closeableStream.CloseWithError(panicError{val})
}
}()
err := action(ctx, closeableStream)
_ = closeableStream.CloseWithError(err)
}()
err := <- ch
if pErr, ok := err.(panicError); ok {
panic(pErr.val)
}
return err
}
}
// CloseableBidiStream is just like a connect.BidiStream except that it has a Close
// method which can be called to terminate a stream, as an alternative to the main
// handler function returning. See WithCloseableBidiStream.
type CloseableBidiStream[Req,Resp any] struct {
*connect.BidiStream[Req,Resp]
closed atomic.Bool
closeChan chan error
}
// Close implements io.Closeable. The client will receive an "OK" status for the
// stream. To send an RPC error code, use CloseWithError instead.
func (c *CloseableBidiStream[_,_]) Close() error {
return c.CloseWithError(nil)
}
// CloseWithError closes the stream and optionally sends back an RPC error
// based on the given err. If err is nil, an "OK" status is sent.
func (c *CloseableBidiStream[_,_]) CloseWithError(err error) error {
if !c.closed.CompareAndSwap(false, true) {
return errors.New("stream already closed")
}
c.closeChan <- err
}
type panicError struct {
val any
}
func (p panicError) Error() string {
return fmt.Sprintf("handler panicked: %v", p.val)
}
// Example usage
func (h *handler) SomeBidiRPCMethod(ctx context.Context, stream *connect.BidiStream[pb.Request, pb.Response]) error {
WithCloseableBidiStream(func(ctx context.Context, stream *CloeableBidiStream[pb.Request, pb.Response]) error {
// Do some logic ... This could spawn some other goroutine that later wants to call stream.Close()
err := doSomethingInteresting(ctx, stream)
return err
})(ctx, stream)
} For now, I will leave this issue open, since it's a reasonable feature request that perhaps something like the above belongs in this module as supported, exported API or in a companion module that contains useful helpers (similar to the kind of stuff in grpc-ecosystem/go-grpc-middleware). |
It totally depends on the RPC handler's needs. It is quite common practice to not put it in another goroutine, in which case the But if your RPC logic needs to receive a message and may need to abort that call due to other circumstances, then it needs to do so in a separate goroutine. I would likely recommend, in your case, just calling |
Thanks for the insight @jhump - yeah something like your example was what first came to mind but while reasoning through it, I realized it's just simpler to move Thanks for keeping the issue open - yeah I'm familiar with some of the limitations of |
Is your feature request related to a problem? Please describe.
I am implementing a
BidiStream
which maps messages back and forth with a separate service that is similar to aBiDiStream
(technically it's a websocket service). I couldn't find a way to receive messages from the other service in a separate goroutine since it would have no way to terminate astream.Receive
call AFAICT.For an error case I want to end this
BiDiStream
, but withstream.Receive()
blocking for a message, it doesn't seem to be possible. While a client has close methods on the stream, server does not.Describe the solution you'd like
Either is fine
Close(error)
onBiDiStream
. PendingReceive()
will return with theerror
. Iferror
is not accepted byClose
, then a generic error that can beerrors.Is
can be returned insteadctx
toReceive
(Likely a new method likeReceiveContext(ctx)
, when the context is canceled the receive method returns. This was brought up in Support context.Context in BidiStream's Receive() #735 but IIUC it was a somewhat different case than this one. But I guess with current golang stdlib this may not be implementable (more direct access to the http2 stream would be needed for proper cancellation)Describe alternatives you've considered
If you've proposed a solution, are there any alternatives? Why are they worse
than your preferred approach?
Always call
stream.Receive
in another goroutine, instead of vice-versa. In my example,sess
does have aClose
method which would causesess.Receive()
in the main goroutine to return witherror
, similar to my proposal. And if it was otherwise a connect-goBidiStream
client, it would also be closeable. I don't know if there is some hidden "always receive client messages in another goroutine" connect best practice, but intuitively there shouldn't be I guess.Additional context
Add any other context or screenshots about the feature request here.
The text was updated successfully, but these errors were encountered: