Skip to content

Commit

Permalink
🪈 http: use separate goroutines for request and response forwarding
Browse files Browse the repository at this point in the history
  • Loading branch information
database64128 committed Nov 21, 2024
1 parent d50a237 commit 1a3a6c5
Showing 1 changed file with 158 additions and 82 deletions.
240 changes: 158 additions & 82 deletions http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"net/url"
"strings"
"sync"
"time"

"github.com/database64128/shadowsocks-go/conn"
Expand Down Expand Up @@ -54,61 +55,140 @@ func NewHttpStreamServerReadWriter(rw zerocopy.DirectReadWriteCloser, logger *za
// Set up pipes.
pl, pr := pipe.NewDuplexPipe()

// Spin up a goroutine to write processed requests to pl
// and read responses from pl.
// Spin up separate request and response forwarding goroutines.
// This is necessary for handling 100 Continue responses, and allows pipelining.
go func() {
var err error
defer func() {
_ = pl.Close()
_ = rw.Close()
}()

plbr := bufio.NewReader(pl)
plbw := bufio.NewWriter(pl)
rwbw := bufio.NewWriter(rw)
rwbwpcw := newPipeClosingWriter(rwbw, pl)

// The current implementation only supports a fixed destination host.
fixedHost := req.Host
// We might want to look at the Connection header here to handle Upgrade requests.
// In practice, browsers prefer SOCKS5 proxies for WebSocket connections, so we hold off
// on the extra complexity for now.

for {
// Delete hop-by-hop headers specified in Connection.
connectionHeader := req.Header["Connection"]
for i := range connectionHeader {
req.Header.Del(connectionHeader[i])
}
delete(req.Header, "Connection")
// This allows pipelining up to 16 requests.
reqCh := make(chan *http.Request, 16)

delete(req.Header, "Proxy-Connection")
var wg sync.WaitGroup
wg.Add(1)

// Write request.
if err = req.Write(plbw); err != nil {
err = fmt.Errorf("failed to write HTTP request: %w", err)
_ = send502(rw)
break
}
go func() {
defer wg.Done()
err := serverForwardRequests(req, reqCh, plbw, rwbr, logger)
pl.CloseWriteWithError(err)
}()

// Flush request.
if err = plbw.Flush(); err != nil {
err = fmt.Errorf("failed to flush HTTP request: %w", err)
_ = send502(rw)
break
serverForwardResponses(reqCh, plbr, rw, rwbw, rwbwpcw, logger)

wg.Wait()
}()

// Wrap pr into a direct stream ReadWriter.
return direct.NewDirectStreamReadWriter(pr), targetAddr, nil
}

func serverForwardRequests(req *http.Request, reqCh chan<- *http.Request, plbw *bufio.Writer, rwbr *bufio.Reader, logger *zap.Logger) (err error) {
defer close(reqCh)

// The current implementation only supports a fixed destination host.
fixedHost := req.Host

for {
// Delete hop-by-hop headers specified in Connection.
connectionHeader := req.Header["Connection"]
for i := range connectionHeader {
req.Header.Del(connectionHeader[i])
}
delete(req.Header, "Connection")

delete(req.Header, "Proxy-Connection")

// Notify the response forwarding routine about the request before writing it out,
// so that a received 100 Continue response can be forwarded back to the client
// in time, unblocking the write.
reqCh <- req

// Write request.
if err = req.Write(plbw); err != nil {
return fmt.Errorf("failed to write HTTP request: %w", err)
}

// Flush request.
if err = plbw.Flush(); err != nil {
return fmt.Errorf("failed to flush HTTP request: %w", err)
}

// No need to check req.Close here because http.ReadRequest will naturally
// fail with io.EOF when the client shuts down further writes.

// Read request.
req, err = http.ReadRequest(rwbr)
if err != nil {
if err == io.EOF {
return nil
}
return fmt.Errorf("failed to read HTTP request: %w", err)
}

if ce := logger.Check(zap.DebugLevel, "Received subsequent HTTP request"); ce != nil {
ce.Write(
zap.String("proto", req.Proto),
zap.String("method", req.Method),
zap.String("url", req.RequestURI),
zap.String("host", req.Host),
zap.String("fixedHost", fixedHost),
zap.Int64("contentLength", req.ContentLength),
zap.Bool("close", req.Close),
)
}

var resp *http.Response
// Close the proxy connection if the destination host changes.
// This workaround is necessary until we migrate to using [*http.Client].
if req.Host != fixedHost {
return fmt.Errorf("destination host changed from %q to %q", fixedHost, req.Host)
}
}
}

func serverForwardResponses(reqCh <-chan *http.Request, plbr *bufio.Reader, rw zerocopy.DirectReadWriteCloser, rwbw *bufio.Writer, rwbwpcw *pipeClosingWriter, logger *zap.Logger) {
defer rw.CloseWrite()

for req := range reqCh {
for {
// Read response.
resp, err = http.ReadResponse(plbr, req)
resp, err := http.ReadResponse(plbr, req)
if err != nil {
err = fmt.Errorf("failed to read HTTP response: %w", err)
logger.Warn("Failed to read HTTP response",
zap.String("reqProto", req.Proto),
zap.String("reqMethod", req.Method),
zap.String("reqURL", req.RequestURI),
zap.String("reqHost", req.Host),
zap.Int64("reqContentLength", req.ContentLength),
zap.Bool("reqClose", req.Close),
zap.Error(err),
)
_ = send502(rw)
break
return
}

if ce := logger.Check(zap.DebugLevel, "Received HTTP response"); ce != nil {
ce.Write(
zap.String("url", req.RequestURI),
zap.String("host", req.Host),
zap.String("proto", resp.Proto),
zap.String("status", resp.Status),
zap.Int64("contentLength", resp.ContentLength),
zap.Bool("close", resp.Close),
zap.String("reqProto", req.Proto),
zap.String("reqMethod", req.Method),
zap.String("reqURL", req.RequestURI),
zap.String("reqHost", req.Host),
zap.Int64("reqContentLength", req.ContentLength),
zap.Bool("reqClose", req.Close),
zap.String("respProto", resp.Proto),
zap.String("respStatus", resp.Status),
zap.Int64("respContentLength", resp.ContentLength),
zap.Bool("respClose", resp.Close),
)
}

Expand All @@ -120,11 +200,13 @@ func NewHttpStreamServerReadWriter(rw zerocopy.DirectReadWriteCloser, logger *za

if ce := logger.Check(zap.DebugLevel, "Checking HTTP 3xx response Location header"); ce != nil {
ce.Write(
zap.String("url", req.RequestURI),
zap.String("host", req.Host),
zap.String("proto", resp.Proto),
zap.String("status", resp.Status),
zap.Strings("location", location),
zap.String("reqProto", req.Proto),
zap.String("reqMethod", req.Method),
zap.String("reqURL", req.RequestURI),
zap.String("reqHost", req.Host),
zap.String("respProto", resp.Proto),
zap.String("respStatus", resp.Status),
zap.Strings("respLocation", location),
)
}

Expand All @@ -138,7 +220,7 @@ func NewHttpStreamServerReadWriter(rw zerocopy.DirectReadWriteCloser, logger *za
}

switch url.Host {
case fixedHost, "":
case req.Host, "":
default:
resp.Close = true
}
Expand All @@ -154,60 +236,54 @@ func NewHttpStreamServerReadWriter(rw zerocopy.DirectReadWriteCloser, logger *za
//
// When we migrate to using [*http.Client], [*pipeClosingWriter] needs to be updated to cancel the request context instead.
if err = resp.Write(rwbwpcw); err != nil {
err = fmt.Errorf("failed to write HTTP response: %w", err)
break
logger.Warn("Failed to write HTTP response",
zap.String("reqProto", req.Proto),
zap.String("reqMethod", req.Method),
zap.String("reqURL", req.RequestURI),
zap.String("reqHost", req.Host),
zap.Int64("reqContentLength", req.ContentLength),
zap.Bool("reqClose", req.Close),
zap.String("respProto", resp.Proto),
zap.String("respStatus", resp.Status),
zap.Int64("respContentLength", resp.ContentLength),
zap.Bool("respClose", resp.Close),
zap.Error(err),
)
return
}

// Flush response.
if err = rwbw.Flush(); err != nil {
err = fmt.Errorf("failed to flush HTTP response: %w", err)
break
logger.Warn("Failed to flush HTTP response",
zap.String("reqProto", req.Proto),
zap.String("reqMethod", req.Method),
zap.String("reqURL", req.RequestURI),
zap.String("reqHost", req.Host),
zap.Int64("reqContentLength", req.ContentLength),
zap.Bool("reqClose", req.Close),
zap.String("respProto", resp.Proto),
zap.String("respStatus", resp.Status),
zap.Int64("respContentLength", resp.ContentLength),
zap.Bool("respClose", resp.Close),
zap.Error(err),
)
return
}

// Stop relaying if either client or server indicates that the connection should be closed.
// Stop forwarding if the server indicates that the connection should be closed.
//
// RFC 7230 section 6.6 says:
// The server SHOULD send a "close" connection option in its final response on that connection.
//
// It's not a "MUST", so we check both.
if req.Close || resp.Close {
break
}

// Read request.
req, err = http.ReadRequest(rwbr)
if err != nil {
if err != io.EOF {
err = fmt.Errorf("failed to read HTTP request: %w", err)
}
break
}

if ce := logger.Check(zap.DebugLevel, "Received subsequent HTTP request"); ce != nil {
ce.Write(
zap.String("proto", req.Proto),
zap.String("method", req.Method),
zap.String("url", req.RequestURI),
zap.String("host", req.Host),
zap.String("fixedHost", fixedHost),
zap.Int64("contentLength", req.ContentLength),
zap.Bool("close", req.Close),
)
if resp.Close {
return
}

// Close the proxy connection if the destination host changes.
// The client should seamlessly open a new connection.
if req.Host != fixedHost {
// 100 Continue is not the final response.
if resp.StatusCode != http.StatusContinue {
break
}
}

pl.CloseWithError(err)
rw.Close()
}()

// Wrap pr into a direct stream ReadWriter.
return direct.NewDirectStreamReadWriter(pr), targetAddr, nil
}
}

var errEmptyHostHeader = errors.New("empty host header")
Expand Down Expand Up @@ -235,12 +311,12 @@ func hostHeaderToAddr(host string) (conn.Addr, error) {
}

func send400(w io.Writer) error {
_, err := w.Write([]byte("HTTP/1.1 400 Bad Request\r\n\r\n"))
_, err := w.Write([]byte("HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\n"))
return err
}

func send502(w io.Writer) error {
_, err := w.Write([]byte("HTTP/1.1 502 Bad Gateway\r\n\r\n"))
_, err := w.Write([]byte("HTTP/1.1 502 Bad Gateway\r\nConnection: close\r\n\r\n"))
return err
}

Expand Down

0 comments on commit 1a3a6c5

Please sign in to comment.