Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to close server side of BidiStream from separate goroutine #823

Open
anuraaga opened this issue Feb 5, 2025 · 4 comments
Open
Labels
enhancement New feature or request

Comments

@anuraaga
Copy link

anuraaga commented Feb 5, 2025

Is your feature request related to a problem? Please describe.

I am implementing a BidiStream which maps messages back and forth with a separate service that is similar to a BiDiStream (technically it's a websocket service). I couldn't find a way to receive messages from the other service in a separate goroutine since it would have no way to terminate a stream.Receive call AFAICT.

func (h *Handler) Chat(ctx context.Context, stream *connect.BidiStream[frontendapi.ChatRequest, frontendapi.ChatResponse]) error {
sess := genai.Connect(...)
defer sess.Close()
go func() {
	for {
		msg, err := sess.Receive()
		if err != nil {
			// Want to close BidiStream
			println(err)
			return
		}
		for _, p := range msg.ServerContent.ModelTurn.Parts {
			if p.InlineData == nil {
				continue
			}
			if !strings.HasPrefix(p.InlineData.MIMEType, "audio/") {
				continue
			}
			if err := stream.Send(&frontendapi.ChatResponse{
				Content: &frontendapi.ChatContent{
					Payload: &frontendapi.ChatContent_Audio{
						Audio: p.InlineData.Data,
					},
				},
			}); err != nil {
				println(err)
				return
			}
		}
	}
}()

for {
	msg, err := stream.Receive() // BiDiStream is stuck here and can't be canceled I think
	if errors.Is(err, io.EOF) {
		return nil
	}
	if err != nil {
		return fmt.Errorf("chat: receiving message: %w", err)
	}
	if p, ok := msg.GetContent().GetPayload().(*frontendapi.ChatContent_Audio); ok {
		if err := sess.Send(&genai.LiveClientMessage{
			RealtimeInput: &genai.LiveClientRealtimeInput{
				MediaChunks: []*genai.Blob{
					{
						MIMEType: "audio/wav",
						Data:     p.Audio,
					},
				},
			},
		}); err != nil {
			return fmt.Errorf("chat: sending audio to genai: %w", err)
		}
	}
}

For an error case I want to end this BiDiStream, but with stream.Receive() blocking for a message, it doesn't seem to be possible. While a client has close methods on the stream, server does not.

Describe the solution you'd like

Either is fine

  • Add Close(error) on BiDiStream. Pending Receive() will return with the error. If error is not accepted by Close, then a generic error that can be errors.Is can be returned instead
  • Add ctx to Receive (Likely a new method like ReceiveContext(ctx), when the context is canceled the receive method returns. This was brought up in Support context.Context in BidiStream's Receive() #735 but IIUC it was a somewhat different case than this one. But I guess with current golang stdlib this may not be implementable (more direct access to the http2 stream would be needed for proper cancellation)

Describe alternatives you've considered
If you've proposed a solution, are there any alternatives? Why are they worse
than your preferred approach?

Always call stream.Receive in another goroutine, instead of vice-versa. In my example, sess does have a Close method which would cause sess.Receive() in the main goroutine to return with error, similar to my proposal. And if it was otherwise a connect-go BidiStream client, it would also be closeable. I don't know if there is some hidden "always receive client messages in another goroutine" connect best practice, but intuitively there shouldn't be I guess.

Additional context
Add any other context or screenshots about the feature request here.

@anuraaga anuraaga added the enhancement New feature or request label Feb 5, 2025
@jhump
Copy link
Member

jhump commented Feb 5, 2025

The handler closes the stream by returning. If you need to close it with an RPC error code, you simply have the handler return a non-nil error. Any concurrent calls to stream.Receive in other goroutines will abort with an EOF error.

@jhump
Copy link
Member

jhump commented Feb 5, 2025

FWIW, this library is built on net/http and the RPC handler routine is an http.Handler. The net/http package does not provide any way to terminate an in-progress HTTP operation other than exiting the handler.

If you really need to close it from a different goroutine, then you would need to install a trampoline as the RPC handler -- the trampoline just spawns another goroutine to perform the RPC logic and then waits for either it to exit or some other error signal (this can be arranged with channels). This would allow you to signal it to exit with an error from another goroutine, via sending something on a channel.

Below is an example of something like this. (Note that I just improv'ed all of this and haven't actually verified that it works. But it should at least be a useful outline for how to do this. It would need tests to fully flesh it out.)

// WithCloseableBidiStream returns a standard bidi-stream RPC handler function that
// adapts the call so that the given action can close the stream without returning. Note
// that this could result in extra resource-usage or even resource leaks, if the given
// action fails to terminate promptly after the stream is closed.
func WithCloseableBidiStream[Req, Resp any](
    action func(context.Context, *CloseableBidiStream[Req,Resp]) error,
) func(context.Context, *connect.BidiStream[Req,Resp]) error {
    return func(ctx context.Context, stream *connect.BidiStream[Req,Resp]) error {
        ch := make(chan error, 1)
        closeableStream := &CloseableBidiStream[Req,Resp]{BidiStream: stream, closeChan: ch}
        go func() {
            defer func() {
                if val := recover(); val != nil {
                    _ = closeableStream.CloseWithError(panicError{val})
                }
            }()
            err := action(ctx, closeableStream)
            _ = closeableStream.CloseWithError(err)
        }()
        err := <- ch
        if pErr, ok := err.(panicError); ok {
            panic(pErr.val)
        }
        return err
    }
}

// CloseableBidiStream is just like a connect.BidiStream except that it has a Close
// method which can be called to terminate a stream, as an alternative to the main
// handler function returning. See WithCloseableBidiStream.
type CloseableBidiStream[Req,Resp any] struct {
    *connect.BidiStream[Req,Resp]
    closed atomic.Bool
    closeChan chan error
}

// Close implements io.Closeable. The client will receive an "OK" status for the
// stream. To send an RPC error code, use CloseWithError instead.
func (c *CloseableBidiStream[_,_]) Close() error {
    return c.CloseWithError(nil)
}

// CloseWithError closes the stream and optionally sends back an RPC error
// based on the given err. If err is nil, an "OK" status is sent.
func (c *CloseableBidiStream[_,_]) CloseWithError(err error) error {
    if !c.closed.CompareAndSwap(false, true) {
        return errors.New("stream already closed")
    }
    c.closeChan <- err
}

type panicError struct {
    val any
}

func (p panicError) Error() string {
    return fmt.Sprintf("handler panicked: %v", p.val)
}

// Example usage
func (h *handler) SomeBidiRPCMethod(ctx context.Context, stream *connect.BidiStream[pb.Request, pb.Response]) error {
    WithCloseableBidiStream(func(ctx context.Context, stream *CloeableBidiStream[pb.Request, pb.Response]) error {
        // Do some logic ... This could spawn some other goroutine that later wants to call stream.Close()
        err := doSomethingInteresting(ctx, stream)
        return err
    })(ctx, stream)
}

For now, I will leave this issue open, since it's a reasonable feature request that perhaps something like the above belongs in this module as supported, exported API or in a companion module that contains useful helpers (similar to the kind of stuff in grpc-ecosystem/go-grpc-middleware).

@jhump
Copy link
Member

jhump commented Feb 5, 2025

I don't know if there is some hidden "always receive client messages in another goroutine" connect best practice, but intuitively there shouldn't be I guess.

It totally depends on the RPC handler's needs. It is quite common practice to not put it in another goroutine, in which case the Receive call returns when (1) the client sends a message, (2) the client half-closes the stream, (3) the client cancels the RPC, or (4) a network partition occurs and the underlying transport breaks. For the latter two cases, the net/http server cancels the operation's context, which the Receive and Send methods of a stream will respect.

But if your RPC logic needs to receive a message and may need to abort that call due to other circumstances, then it needs to do so in a separate goroutine.

I would likely recommend, in your case, just calling Receive in a different goroutine. The trampoline example above basically does that for you, since it implicitly invokes the action on a separate goroutine. But it is not without downsides (particularly how you coordinate goroutines and make sure that the action function will promptly terminate after the stream is closed).

@anuraaga
Copy link
Author

anuraaga commented Feb 6, 2025

Thanks for the insight @jhump - yeah something like your example was what first came to mind but while reasoning through it, I realized it's just simpler to move Receive in the different goroutine. It doesn't fit my "standard" mental model where the handler routine serves the client requests, and as needed spawns goroutines for backend logic, but it doesn't matter too much in practice.

Thanks for keeping the issue open - yeah I'm familiar with some of the limitations of net/http and hopefully in the future a new version using channels for I/O comes around eventually (channels are so underutilized in the stdlib...). A helper module could be nice but it may also just be enough to add more insight on these limitations / patterns or workarounds in the documentations on methods such as Receive, something like Note that Receive will block until the client sends a message or closes the stream. If you need to be able to cancel receiving on the server side, you will likely need to run Receive in a separate goroutine from the handler and ensure have the handler exit to cancel the stream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants