Skip to content

Commit

Permalink
Fix "deadline exceeded" issues discovered in conformance tests (#643)
Browse files Browse the repository at this point in the history
When adding timeout tests to the conformance suite
several bugs were found in the connect-go client where
the error code would sometimes be `unavailable` or
`unknown`, instead of `deadline_exceeded`. There
were several sources of the incorrect code. This adds
a test for identifying such issues (albeit a slow one)
and fixes everything found.
  • Loading branch information
jhump authored Dec 7, 2023
1 parent cb84690 commit c9408f4
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 19 deletions.
37 changes: 32 additions & 5 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
# when editing this list, also update steps and jobs below
go-version: [1.19.x, 1.20.x, 1.21.x]
steps:
- name: Checkout Code
Expand All @@ -25,17 +26,43 @@ jobs:
uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go-version }}
- name: Test
run: make test
- name: Unit Test
run: make shorttest
- name: Lint
# Often, lint & gofmt guidelines depend on the Go version. To prevent
# conflicting guidance, run only on the most recent supported version.
# For the same reason, only check generated code on the most recent
# supported version.
if: matrix.go-version == '1.21.x'
run: make checkgenerate && make lint
conformance:
runs-on: ubuntu-latest
strategy:
matrix:
# 1.19 is omitted because conformance test runner requires 1.20+
go-version: [1.20.x, 1.21.x]
steps:
- name: Checkout Code
uses: actions/checkout@v4
with:
fetch-depth: 1
- name: Install Go
uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go-version }}
- name: Run Conformance Tests
# A dependency of conformance tests (for testing http/3) requires
# Go 1.20. So we skip this step for Go 1.19.
if: matrix.go-version != '1.19.x'
run: make runconformance
slowtest:
runs-on: ubuntu-latest
steps:
- name: Checkout Code
uses: actions/checkout@v4
with:
fetch-depth: 1
- name: Install Go
uses: actions/setup-go@v4
with:
# only the latest
go-version: 1.21.x
- name: Run Slow Tests
run: make slowtest
20 changes: 18 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,24 @@ clean: ## Delete intermediate build artifacts
git clean -Xdf

.PHONY: test
test: build ## Run unit tests
go test -vet=off -race -cover ./...
test: shorttest slowtest

.PHONY: shorttest
shorttest: build ## Run unit tests
go test -vet=off -race -cover -short ./...

.PHONY: slowtest
# Runs all tests, including known long/slow ones. The
# race detector is not used for a few reasons:
# 1. Race coverage of the short tests should be
# adequate to catch race conditions.
# 2. It slows tests down, which is not good if we
# know these are already slow tests.
# 3. Some of the slow tests can't repro issues and
# find regressions as reliably with the race
# detector enabled.
slowtest: build
go test ./...

.PHONY: runconformance
runconformance: build ## Run conformance test suite
Expand Down
245 changes: 245 additions & 0 deletions client_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,28 @@
package connect_test

import (
"bytes"
"context"
"crypto/rand"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

connect "connectrpc.com/connect"
"connectrpc.com/connect/internal/assert"
pingv1 "connectrpc.com/connect/internal/gen/connect/ping/v1"
"connectrpc.com/connect/internal/gen/connect/ping/v1/pingv1connect"
"connectrpc.com/connect/internal/memhttp/memhttptest"
"google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoregistry"
"google.golang.org/protobuf/types/dynamicpb"
Expand Down Expand Up @@ -421,6 +431,229 @@ func TestDynamicClient(t *testing.T) {
})
}

func TestClientDeadlineHandling(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("skipping slow test")
}

// Note that these tests are not able to reproduce issues with the race
// detector enabled. That's partly why the makefile only runs "slow"
// tests with the race detector disabled.

_, handler := pingv1connect.NewPingServiceHandler(pingServer{})
svr := httptest.NewUnstartedServer(http.HandlerFunc(func(respWriter http.ResponseWriter, req *http.Request) {
if req.Context().Err() != nil {
return
}
handler.ServeHTTP(respWriter, req)
}))
svr.EnableHTTP2 = true
svr.StartTLS()
t.Cleanup(svr.Close)

// This case creates a new connection for each RPC to verify that timeouts during dialing
// won't cause issues. This is historically easier to reproduce, so it uses a smaller
// duration, no concurrency, and fewer iterations. This is important because if we used
// a new connection for each RPC in the bigger test scenario below, we'd encounter other
// issues related to overwhelming the loopback interface and exhausting ephemeral ports.
t.Run("dial", func(t *testing.T) {
t.Parallel()
transport, ok := svr.Client().Transport.(*http.Transport)
if !assert.True(t, ok) {
t.FailNow()
}
testClientDeadlineBruteForceLoop(t,
5*time.Second, 5, 1,
func(ctx context.Context) (string, rpcErrors) {
httpClient := &http.Client{
Transport: transport.Clone(),
}
client := pingv1connect.NewPingServiceClient(httpClient, svr.URL)
_, err := client.Ping(ctx, connect.NewRequest(&pingv1.PingRequest{Text: "foo"}))
// Close all connections and make sure to give a little time for the OS to
// release socket resources to prevent resource exhaustion (such as running
// out of ephemeral ports).
httpClient.CloseIdleConnections()
time.Sleep(time.Millisecond / 2)
return pingv1connect.PingServicePingProcedure, rpcErrors{recvErr: err}
},
)
})

// This case creates significantly more load than the above one, but uses a normal
// client so pools and re-uses connections. It also uses all stream types to send
// messages, to make sure that all stream implementations handle deadlines correctly.
// The I/O errors related to deadlines are historically harder to reproduce, so it
// throws a lot more effort into reproducing, particularly a longer duration for
// which it will run. It also uses larger messages (by packing requests with
// unrecognized fields) and compression, to make it more likely to encounter the
// deadline in the middle of read and write operations.
t.Run("read-write", func(t *testing.T) {
t.Parallel()

var extraField []byte
extraField = protowire.AppendTag(extraField, 999, protowire.BytesType)
extraData := make([]byte, 16*1024)
// use good random data so it's not very compressible
if _, err := rand.Read(extraData); err != nil {
t.Fatalf("failed to generate extra payload: %v", err)
return
}
extraField = protowire.AppendBytes(extraField, extraData)

clientConnect := pingv1connect.NewPingServiceClient(svr.Client(), svr.URL, connect.WithSendGzip())
clientGRPC := pingv1connect.NewPingServiceClient(svr.Client(), svr.URL, connect.WithSendGzip(), connect.WithGRPCWeb())
var count atomic.Int32
testClientDeadlineBruteForceLoop(t,
20*time.Second, 200, runtime.GOMAXPROCS(0),
func(ctx context.Context) (string, rpcErrors) {
var procedure string
var errs rpcErrors
rpcNum := count.Add(1)
var client pingv1connect.PingServiceClient
if rpcNum&4 == 0 {
client = clientConnect
} else {
client = clientGRPC
}
switch rpcNum & 3 {
case 0:
procedure = pingv1connect.PingServicePingProcedure
_, errs.recvErr = client.Ping(ctx, connect.NewRequest(addUnrecognizedBytes(&pingv1.PingRequest{Text: "foo"}, extraField)))
case 1:
procedure = pingv1connect.PingServiceSumProcedure
stream := client.Sum(ctx)
for i := 0; i < 3; i++ {
errs.sendErr = stream.Send(addUnrecognizedBytes(&pingv1.SumRequest{Number: 1}, extraField))
if errs.sendErr != nil {
break
}
}
_, errs.recvErr = stream.CloseAndReceive()
case 2:
procedure = pingv1connect.PingServiceCountUpProcedure
var stream *connect.ServerStreamForClient[pingv1.CountUpResponse]
stream, errs.recvErr = client.CountUp(ctx, connect.NewRequest(addUnrecognizedBytes(&pingv1.CountUpRequest{Number: 3}, extraField)))
if errs.recvErr == nil {
for stream.Receive() {
}
errs.recvErr = stream.Err()
errs.closeRecvErr = stream.Close()
}
case 3:
procedure = pingv1connect.PingServiceCumSumProcedure
stream := client.CumSum(ctx)
for i := 0; i < 3; i++ {
errs.sendErr = stream.Send(addUnrecognizedBytes(&pingv1.CumSumRequest{Number: 1}, extraField))
_, errs.recvErr = stream.Receive()
if errs.recvErr != nil {
break
}
}
errs.closeSendErr = stream.CloseRequest()
errs.closeRecvErr = stream.CloseResponse()
}
return procedure, errs
},
)
})
}

func testClientDeadlineBruteForceLoop(
t *testing.T,
duration time.Duration,
iterationsPerDeadline int,
parallelism int,
loopBody func(ctx context.Context) (string, rpcErrors),
) {
t.Helper()
testContext, testCancel := context.WithTimeout(context.Background(), duration)
defer testCancel()
var rpcCount atomic.Int64

var wg sync.WaitGroup
for goroutine := 0; goroutine < parallelism; goroutine++ {
goroutine := goroutine
wg.Add(1)
go func() {
defer wg.Done()
// We try a range of timeouts since the timing issue is sensitive
// to execution environment (e.g. CPU, memory, and network speeds).
// So the lower timeout values may be more likely to trigger an issue
// in faster environments; higher timeouts for slower environments.
const minTimeout = 10 * time.Microsecond
const maxTimeout = 2 * time.Millisecond
for {
for timeout := minTimeout; timeout <= maxTimeout; timeout += 10 * time.Microsecond {
for i := 0; i < iterationsPerDeadline; i++ {
if testContext.Err() != nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
// We are intentionally not inheriting from testContext, which signals when the
// test loop should stop and return but need not influence the RPC deadline.
proc, errs := loopBody(ctx) //nolint:contextcheck
rpcCount.Add(1)
cancel()
type errCase struct {
err error
name string
allowEOF bool
}
errCases := []errCase{
{
err: errs.sendErr,
name: "send error",
allowEOF: true,
},
{
err: errs.recvErr,
name: "receive error",
},
{
err: errs.closeSendErr,
name: "close-send error",
},
{
err: errs.closeRecvErr,
name: "close-receive error",
},
}
for _, errCase := range errCases {
err := errCase.err
if err == nil {
// operation completed before timeout, try again
continue
}
if errCase.allowEOF && errors.Is(err, io.EOF) {
continue
}

if !assert.Equal(t, connect.CodeOf(err), connect.CodeDeadlineExceeded) {
var buf bytes.Buffer
_, _ = fmt.Fprintf(&buf, "actual %v from %s: %v\n%#v", errCase.name, proc, err, err)
for {
err = errors.Unwrap(err)
if err == nil {
break
}
_, _ = fmt.Fprintf(&buf, "\n caused by: %#v", err)
}
t.Log(buf.String())
testCancel()
}
}
}
}
t.Logf("goroutine %d: repeating duration loop", goroutine)
}
}()
}
wg.Wait()
t.Logf("Issued %d RPCs.", rpcCount.Load())
}

type notModifiedPingServer struct {
pingv1connect.UnimplementedPingServiceHandler

Expand Down Expand Up @@ -515,3 +748,15 @@ func (a *assertSchemaInterceptor) WrapStreamingHandler(next connect.StreamingHan
return next(ctx, conn)
}
}

type rpcErrors struct {
sendErr error
recvErr error
closeSendErr error
closeRecvErr error
}

func addUnrecognizedBytes[M proto.Message](msg M, data []byte) M {
msg.ProtoReflect().SetUnknown(data)
return msg
}
10 changes: 9 additions & 1 deletion compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func (c *compressionPool) Decompress(dst *bytes.Buffer, src *bytes.Buffer, readM
bytesRead, err := dst.ReadFrom(reader)
if err != nil {
_ = c.putDecompressor(decompressor)
err = wrapIfContextError(err)
if connectErr, ok := asError(err); ok {
return connectErr
}
return errorf(CodeInvalidArgument, "decompress: %w", err)
}
if readMaxBytes > 0 && bytesRead > readMaxBytes {
Expand All @@ -111,8 +115,12 @@ func (c *compressionPool) Compress(dst *bytes.Buffer, src *bytes.Buffer) *Error
if err != nil {
return errorf(CodeUnknown, "get compressor: %w", err)
}
if _, err := io.Copy(compressor, src); err != nil {
if _, err := src.WriteTo(compressor); err != nil {
_ = c.putCompressor(compressor)
err = wrapIfContextError(err)
if connectErr, ok := asError(err); ok {
return connectErr
}
return errorf(CodeInternal, "compress: %w", err)
}
if err := c.putCompressor(compressor); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func receiveUnaryResponse[T any](conn StreamingClientConn, initializer maybeInit
if err := conn.Receive(&msg2); err == nil {
return nil, NewError(CodeUnknown, errors.New("unary stream has multiple messages"))
} else if err != nil && !errors.Is(err, io.EOF) {
return nil, NewError(CodeUnknown, err)
return nil, err
}
return &Response[T]{
Msg: &msg,
Expand Down
Loading

0 comments on commit c9408f4

Please sign in to comment.