From be1e9d115ff48fb63da89fb633751bcb769679ef Mon Sep 17 00:00:00 2001 From: database64128 Date: Thu, 21 Nov 2024 11:34:51 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=AA=88=20http:=20use=20separate=20gorouti?= =?UTF-8?q?nes=20for=20request=20and=20response=20forwarding?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- http/server.go | 234 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 155 insertions(+), 79 deletions(-) diff --git a/http/server.go b/http/server.go index 51e7b67..ca2dd95 100644 --- a/http/server.go +++ b/http/server.go @@ -8,6 +8,7 @@ import ( "net/http" "net/url" "strings" + "sync" "time" "github.com/database64128/shadowsocks-go/conn" @@ -54,61 +55,138 @@ func NewHttpStreamServerReadWriter(rw zerocopy.DirectReadWriteCloser, logger *za // Set up pipes. pl, pr := pipe.NewDuplexPipe() - // Spin up a goroutine to write processed requests to pl - // and read responses from pl. + // Spin up separate request and response forwarding goroutines. + // This is necessary for handling 100 Continue responses, and allows pipelining. go func() { - var err error + defer func() { + _ = pl.Close() + _ = rw.Close() + }() plbr := bufio.NewReader(pl) plbw := bufio.NewWriter(pl) rwbw := bufio.NewWriter(rw) rwbwpcw := newPipeClosingWriter(rwbw, pl) + reqCh := make(chan *http.Request, 16) // allow pipelining up to 16 requests - // The current implementation only supports a fixed destination host. - fixedHost := req.Host + var wg sync.WaitGroup + wg.Add(1) - for { - // Delete hop-by-hop headers specified in Connection. - connectionHeader := req.Header["Connection"] - for i := range connectionHeader { - req.Header.Del(connectionHeader[i]) - } - delete(req.Header, "Connection") + go func() { + defer wg.Done() + err := serverForwardRequests(req, reqCh, plbw, rwbr, logger) + pl.CloseWriteWithError(err) + }() - delete(req.Header, "Proxy-Connection") + serverForwardResponses(reqCh, plbr, rw, rwbw, rwbwpcw, logger) - // Write request. - if err = req.Write(plbw); err != nil { - err = fmt.Errorf("failed to write HTTP request: %w", err) - _ = send502(rw) - break - } + wg.Wait() + }() - // Flush request. - if err = plbw.Flush(); err != nil { - err = fmt.Errorf("failed to flush HTTP request: %w", err) - _ = send502(rw) - break + // Wrap pr into a direct stream ReadWriter. + return direct.NewDirectStreamReadWriter(pr), targetAddr, nil +} + +func serverForwardRequests(req *http.Request, reqCh chan<- *http.Request, plbw *bufio.Writer, rwbr *bufio.Reader, logger *zap.Logger) (err error) { + defer close(reqCh) + + // The current implementation only supports a fixed destination host. + fixedHost := req.Host + + for { + // Delete hop-by-hop headers specified in Connection. + // + // We might want to look at the Connection header here to handle Upgrade requests. + // In practice, browsers prefer SOCKS5 proxies for WebSocket connections, so we hold off + // on the extra complexity for now. + connectionHeader := req.Header["Connection"] + for i := range connectionHeader { + req.Header.Del(connectionHeader[i]) + } + delete(req.Header, "Connection") + + delete(req.Header, "Proxy-Connection") + + // Notify the response forwarding routine about the request before writing it out, + // so that a received 100 Continue response can be forwarded back to the client + // in time, unblocking the write. + reqCh <- req + + // Write request. + if err = req.Write(plbw); err != nil { + return fmt.Errorf("failed to write HTTP request: %w", err) + } + + // Flush request. + if err = plbw.Flush(); err != nil { + return fmt.Errorf("failed to flush HTTP request: %w", err) + } + + // No need to check req.Close here because http.ReadRequest will naturally + // fail with io.EOF when the client shuts down further writes. + + // Read request. + req, err = http.ReadRequest(rwbr) + if err != nil { + if err == io.EOF { + return nil } + return fmt.Errorf("failed to read HTTP request: %w", err) + } - var resp *http.Response + if ce := logger.Check(zap.DebugLevel, "Received subsequent HTTP request"); ce != nil { + ce.Write( + zap.String("proto", req.Proto), + zap.String("method", req.Method), + zap.String("url", req.RequestURI), + zap.String("host", req.Host), + zap.String("fixedHost", fixedHost), + zap.Int64("contentLength", req.ContentLength), + zap.Bool("close", req.Close), + ) + } + // Close the proxy connection if the destination host changes. + // This workaround is necessary until we migrate to using [*http.Client]. + if req.Host != fixedHost { + return fmt.Errorf("destination host changed from %q to %q", fixedHost, req.Host) + } + } +} + +func serverForwardResponses(reqCh <-chan *http.Request, plbr *bufio.Reader, rw zerocopy.DirectReadWriteCloser, rwbw *bufio.Writer, rwbwpcw *pipeClosingWriter, logger *zap.Logger) { + defer rw.CloseWrite() + + for req := range reqCh { + for { // Read response. - resp, err = http.ReadResponse(plbr, req) + resp, err := http.ReadResponse(plbr, req) if err != nil { - err = fmt.Errorf("failed to read HTTP response: %w", err) + logger.Warn("Failed to read HTTP response", + zap.String("reqProto", req.Proto), + zap.String("reqMethod", req.Method), + zap.String("reqURL", req.RequestURI), + zap.String("reqHost", req.Host), + zap.Int64("reqContentLength", req.ContentLength), + zap.Bool("reqClose", req.Close), + zap.Error(err), + ) _ = send502(rw) - break + return } if ce := logger.Check(zap.DebugLevel, "Received HTTP response"); ce != nil { ce.Write( - zap.String("url", req.RequestURI), - zap.String("host", req.Host), - zap.String("proto", resp.Proto), - zap.String("status", resp.Status), - zap.Int64("contentLength", resp.ContentLength), - zap.Bool("close", resp.Close), + zap.String("reqProto", req.Proto), + zap.String("reqMethod", req.Method), + zap.String("reqURL", req.RequestURI), + zap.String("reqHost", req.Host), + zap.Int64("reqContentLength", req.ContentLength), + zap.Bool("reqClose", req.Close), + zap.String("respProto", resp.Proto), + zap.String("respStatus", resp.Status), + zap.Int64("respContentLength", resp.ContentLength), + zap.Bool("respClose", resp.Close), ) } @@ -120,11 +198,13 @@ func NewHttpStreamServerReadWriter(rw zerocopy.DirectReadWriteCloser, logger *za if ce := logger.Check(zap.DebugLevel, "Checking HTTP 3xx response Location header"); ce != nil { ce.Write( - zap.String("url", req.RequestURI), - zap.String("host", req.Host), - zap.String("proto", resp.Proto), - zap.String("status", resp.Status), - zap.Strings("location", location), + zap.String("reqProto", req.Proto), + zap.String("reqMethod", req.Method), + zap.String("reqURL", req.RequestURI), + zap.String("reqHost", req.Host), + zap.String("respProto", resp.Proto), + zap.String("respStatus", resp.Status), + zap.Strings("respLocation", location), ) } @@ -138,7 +218,7 @@ func NewHttpStreamServerReadWriter(rw zerocopy.DirectReadWriteCloser, logger *za } switch url.Host { - case fixedHost, "": + case req.Host, "": default: resp.Close = true } @@ -154,60 +234,56 @@ func NewHttpStreamServerReadWriter(rw zerocopy.DirectReadWriteCloser, logger *za // // When we migrate to using [*http.Client], [*pipeClosingWriter] needs to be updated to cancel the request context instead. if err = resp.Write(rwbwpcw); err != nil { - err = fmt.Errorf("failed to write HTTP response: %w", err) - break + logger.Warn("Failed to write HTTP response", + zap.String("reqProto", req.Proto), + zap.String("reqMethod", req.Method), + zap.String("reqURL", req.RequestURI), + zap.String("reqHost", req.Host), + zap.Int64("reqContentLength", req.ContentLength), + zap.Bool("reqClose", req.Close), + zap.String("respProto", resp.Proto), + zap.String("respStatus", resp.Status), + zap.Int64("respContentLength", resp.ContentLength), + zap.Bool("respClose", resp.Close), + zap.Error(err), + ) + return } // Flush response. if err = rwbw.Flush(); err != nil { - err = fmt.Errorf("failed to flush HTTP response: %w", err) - break + logger.Warn("Failed to flush HTTP response", + zap.String("reqProto", req.Proto), + zap.String("reqMethod", req.Method), + zap.String("reqURL", req.RequestURI), + zap.String("reqHost", req.Host), + zap.Int64("reqContentLength", req.ContentLength), + zap.Bool("reqClose", req.Close), + zap.String("respProto", resp.Proto), + zap.String("respStatus", resp.Status), + zap.Int64("respContentLength", resp.ContentLength), + zap.Bool("respClose", resp.Close), + zap.Error(err), + ) + return } - // Stop relaying if either client or server indicates that the connection should be closed. + // Stop forwarding if either the client or server indicates that the connection should be closed. // // RFC 7230 section 6.6 says: // The server SHOULD send a "close" connection option in its final response on that connection. // // It's not a "MUST", so we check both. if req.Close || resp.Close { - break + return } - // Read request. - req, err = http.ReadRequest(rwbr) - if err != nil { - if err != io.EOF { - err = fmt.Errorf("failed to read HTTP request: %w", err) - } - break - } - - if ce := logger.Check(zap.DebugLevel, "Received subsequent HTTP request"); ce != nil { - ce.Write( - zap.String("proto", req.Proto), - zap.String("method", req.Method), - zap.String("url", req.RequestURI), - zap.String("host", req.Host), - zap.String("fixedHost", fixedHost), - zap.Int64("contentLength", req.ContentLength), - zap.Bool("close", req.Close), - ) - } - - // Close the proxy connection if the destination host changes. - // The client should seamlessly open a new connection. - if req.Host != fixedHost { + // 100 Continue is not the final response. + if resp.StatusCode != http.StatusContinue { break } } - - pl.CloseWithError(err) - rw.Close() - }() - - // Wrap pr into a direct stream ReadWriter. - return direct.NewDirectStreamReadWriter(pr), targetAddr, nil + } } var errEmptyHostHeader = errors.New("empty host header") @@ -235,12 +311,12 @@ func hostHeaderToAddr(host string) (conn.Addr, error) { } func send400(w io.Writer) error { - _, err := w.Write([]byte("HTTP/1.1 400 Bad Request\r\n\r\n")) + _, err := w.Write([]byte("HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\n")) return err } func send502(w io.Writer) error { - _, err := w.Write([]byte("HTTP/1.1 502 Bad Gateway\r\n\r\n")) + _, err := w.Write([]byte("HTTP/1.1 502 Bad Gateway\r\nConnection: close\r\n\r\n")) return err }