Skip to content

Commit

Permalink
More tracer improvements (#765)
Browse files Browse the repository at this point in the history
This contains a few fixes/improvements to the tracing stuff:
* Previously, the round tripper might not print the `Content-Length`
header. This is because the value may come from the `ContentLength`
(of `http.Request` or `http.Response`) instead of headers. So now, the
code will look at the `ContentLength` field and decide if it needs to emit
another header line for it in the trace output.
* Previously, after an operation was cancelled, it could still show
events that didn't _really_ happen. When the operation is cancelled,
either the HTTP/2 stream has been cancelled or the HTTP 1.1 connection
has been terminated. Either way, there is no other activity. But the
`http.ResponseWriter` that captures trace events would keep trying to
add events to the trace anyway.
* In the `testResults` type in the conformance runner, it would wait for
all traces to be available (up to 5 seconds) before reporting results
(so the printed results can include trace details). But some operations
will never have a trace coming -- in particular, the ones that interact
with the grpc-go reference client or server. So now the runner is
updated so that it won't bother waiting for those traces, since they
aren't coming. When running just a few test cases, that include the
grpc-go impls, this makes the tests run faster since there's no longer a
pause of a few seconds right before reporting results.


This also includes an unrelated change to the cancellation test cases.
The "full duplex cancel after close send" case was racy: when the client
closed-send, the server would see that, try to complete the RPC, and
return the result, all concurrently with the client subsequently trying
to cancel the operation. So if the client cancellation is slow enough,
the client could get back the RPC result before it send (and the server
saw) the cancellation signal. This was happening pretty regularly with
the connect-kotlin client. So now, the test case includes some delay,
and the server will try to send one more response before completing the
RPC. That gives a nice 200 millisecond cushion so that the client should
always successfully cancel before the server can finish the RPC.
  • Loading branch information
jhump authored Jan 30, 2024
1 parent 02b4fd2 commit e002318
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 90 deletions.
60 changes: 0 additions & 60 deletions internal/app/connectconformance/connectconformance.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"io"
"os"
"path"
"sort"
"strconv"
"strings"
Expand All @@ -35,7 +34,6 @@ import (
conformancev1 "connectrpc.com/conformance/internal/gen/proto/go/connectrpc/conformance/v1"
"connectrpc.com/conformance/internal/tracer"
"golang.org/x/sync/semaphore"
"google.golang.org/protobuf/proto"
)

// Flags are the config values for the test runner that may be provided via
Expand Down Expand Up @@ -419,64 +417,6 @@ func logTestCaseInfo(with string, svrInstance serverInstance, numCases int, logP
numCases, with, svrInstance.httpVersion, svrInstance.protocol, tlsMode)
}

func filterGRPCImplTestCases(testCases []*conformancev1.TestCase, clientIsGRPCImpl, serverIsGRPCImpl bool) []*conformancev1.TestCase {
if !clientIsGRPCImpl && !serverIsGRPCImpl {
return testCases
}

// The gRPC reference impls do not support everything that the main reference impls do.
// So we must filter away any test cases that aren't applicable to the gRPC impls.

filtered := make([]*conformancev1.TestCase, 0, len(testCases))
for _, testCase := range testCases {
// Client only supports gRPC protocol. Server also supports gRPC-Web.
if clientIsGRPCImpl && testCase.Request.Protocol != conformancev1.Protocol_PROTOCOL_GRPC ||
testCase.Request.Protocol == conformancev1.Protocol_PROTOCOL_CONNECT {
continue
}

if testCase.Request.Protocol == conformancev1.Protocol_PROTOCOL_GRPC_WEB {
// grpc-web supports HTTP/1 and HTTP/2
switch testCase.Request.HttpVersion {
case conformancev1.HTTPVersion_HTTP_VERSION_1, conformancev1.HTTPVersion_HTTP_VERSION_2:
default:
continue
}
} else if testCase.Request.HttpVersion != conformancev1.HTTPVersion_HTTP_VERSION_2 {
// but grpc only supports HTTP/2
continue
}

if testCase.Request.Codec != conformancev1.Codec_CODEC_PROTO {
continue
}
if testCase.Request.Compression != conformancev1.Compression_COMPRESSION_IDENTITY &&
testCase.Request.Compression != conformancev1.Compression_COMPRESSION_GZIP {
continue
}

if len(testCase.Request.ServerTlsCert) > 0 {
continue
}

filteredCase := proto.Clone(testCase).(*conformancev1.TestCase) //nolint:errcheck,forcetypeassert
// Insert a path in the test name to indicate that this is against the gRPC impl.
dir, base := path.Dir(filteredCase.Request.TestName), path.Base(filteredCase.Request.TestName)
var elem string
switch {
case clientIsGRPCImpl && serverIsGRPCImpl:
elem = "(grpc impls)"
case clientIsGRPCImpl:
elem = "(grpc client impl)"
case serverIsGRPCImpl:
elem = "(grpc server impl)"
}
filteredCase.Request.TestName = path.Join(dir, elem, base)
filtered = append(filtered, filteredCase)
}
return filtered
}

func tryMatchPatterns(what string, patterns *testTrie, testCases []*conformancev1.TestCase) error {
for _, tc := range testCases {
patterns.matchPattern(tc.Request.TestName)
Expand Down
4 changes: 2 additions & 2 deletions internal/app/connectconformance/connectconformance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func TestRun(t *testing.T) {
allPermutations := testCaseLib.allPermutations(true, true)
expectedNumCases := len(allPermutations)

// 19 test cases as of this writing, but we will likely add more
require.GreaterOrEqual(t, expectedNumCases, 40)
// 53 test cases as of this writing, but we will likely add more
require.GreaterOrEqual(t, expectedNumCases, 53)

logger := &testPrinter{t}
results, err := run(
Expand Down
8 changes: 8 additions & 0 deletions internal/app/connectconformance/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ func (r *testResults) fetchTrace(testCase string) {
if r.tracer == nil {
return
}
if strings.Contains(testCase, grpcImplMarker) ||
strings.Contains(testCase, grpcClientImplMarker) ||
strings.Contains(testCase, grpcServerImplMarker) {
// No trace coming from the grpc-go impls.
r.tracer.Clear(testCase)
return
}

r.traceWaitGroup.Add(1)
go func() {
defer r.traceWaitGroup.Done()
Expand Down
65 changes: 65 additions & 0 deletions internal/app/connectconformance/test_case_library.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ const (
// response (which echoes back the request data) won't exceed client limit.
clientReceiveLimit = 1024 * 1024 // 1 MB
serverReceiveLimit = 200 * 1024 // 200 KB

// these are inserted into test case permutation names for the permutations
// of a test case that use the grpc-go implementation of a reference client
// or server.
grpcImplMarker = "(grpc impls)"
grpcClientImplMarker = "(grpc client impl)"
grpcServerImplMarker = "(grpc server impl)"
)

//nolint:gochecknoglobals
Expand Down Expand Up @@ -705,6 +712,64 @@ func generateTestCasePrefix(suite *conformancev1.TestSuite, cfgCase configCase)
return components
}

func filterGRPCImplTestCases(testCases []*conformancev1.TestCase, clientIsGRPCImpl, serverIsGRPCImpl bool) []*conformancev1.TestCase {
if !clientIsGRPCImpl && !serverIsGRPCImpl {
return testCases
}

// The gRPC reference impls do not support everything that the main reference impls do.
// So we must filter away any test cases that aren't applicable to the gRPC impls.

filtered := make([]*conformancev1.TestCase, 0, len(testCases))
for _, testCase := range testCases {
// Client only supports gRPC protocol. Server also supports gRPC-Web.
if clientIsGRPCImpl && testCase.Request.Protocol != conformancev1.Protocol_PROTOCOL_GRPC ||
testCase.Request.Protocol == conformancev1.Protocol_PROTOCOL_CONNECT {
continue
}

if testCase.Request.Protocol == conformancev1.Protocol_PROTOCOL_GRPC_WEB {
// grpc-web supports HTTP/1 and HTTP/2
switch testCase.Request.HttpVersion {
case conformancev1.HTTPVersion_HTTP_VERSION_1, conformancev1.HTTPVersion_HTTP_VERSION_2:
default:
continue
}
} else if testCase.Request.HttpVersion != conformancev1.HTTPVersion_HTTP_VERSION_2 {
// but grpc only supports HTTP/2
continue
}

if testCase.Request.Codec != conformancev1.Codec_CODEC_PROTO {
continue
}
if testCase.Request.Compression != conformancev1.Compression_COMPRESSION_IDENTITY &&
testCase.Request.Compression != conformancev1.Compression_COMPRESSION_GZIP {
continue
}

if len(testCase.Request.ServerTlsCert) > 0 {
continue
}

filteredCase := proto.Clone(testCase).(*conformancev1.TestCase) //nolint:errcheck,forcetypeassert
// Insert a path in the test name to indicate that this is against the gRPC impl.
dir, base := path.Dir(filteredCase.Request.TestName), path.Base(filteredCase.Request.TestName)
var elem string
switch {
case clientIsGRPCImpl && serverIsGRPCImpl:
elem = grpcImplMarker
case clientIsGRPCImpl:
elem = grpcClientImplMarker
case serverIsGRPCImpl:
elem = grpcServerImplMarker
}
filteredCase.Request.TestName = path.Join(dir, elem, base)
filtered = append(filtered, filteredCase)
}
return filtered
}

func allValues[T ~int32](m map[int32]string) []T {
vals := make([]T, 0, len(m))
for k := range m {
Expand Down
4 changes: 4 additions & 0 deletions internal/app/connectconformance/testsuites/cancellation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,11 @@ testCases:
requestMessages:
- "@type": type.googleapis.com/connectrpc.conformance.v1.BidiStreamRequest
responseDefinition:
responseDelayMs: 200
responseData:
- "dGVzdCByZXNwb25zZQ=="
- "dGVzdCByZXNwb25zZQ=="
- "dGVzdCByZXNwb25zZQ=="
fullDuplex: true
- "@type": type.googleapis.com/connectrpc.conformance.v1.BidiStreamRequest
requestData: "dGVzdCByZXNwb25zZQ=="
Expand All @@ -216,9 +218,11 @@ testCases:
requests:
- "@type": type.googleapis.com/connectrpc.conformance.v1.BidiStreamRequest
responseDefinition:
responseDelayMs: 200
responseData:
- "dGVzdCByZXNwb25zZQ=="
- "dGVzdCByZXNwb25zZQ=="
- "dGVzdCByZXNwb25zZQ=="
fullDuplex: true
- data: "dGVzdCByZXNwb25zZQ=="
requestInfo:
Expand Down
12 changes: 11 additions & 1 deletion internal/app/referenceserver/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/url"
"strconv"
"strings"
"sync"

"connectrpc.com/conformance/internal"
"connectrpc.com/conformance/internal/compression"
Expand All @@ -43,6 +44,8 @@ const (
)

func referenceServerChecks(handler http.Handler, errPrinter internal.Printer) http.HandlerFunc {
var callsMu sync.Mutex
calls := map[string]int{}
return func(respWriter http.ResponseWriter, req *http.Request) {
testCaseName := req.Header.Get("x-test-case-name")
if testCaseName == "" {
Expand All @@ -51,9 +54,16 @@ func referenceServerChecks(handler http.Handler, errPrinter internal.Printer) ht
http.Error(respWriter, "missing x-test-case-name header", http.StatusBadRequest)
return
}

feedback := &feedbackPrinter{p: errPrinter, testCaseName: testCaseName}

callsMu.Lock()
count := calls[testCaseName]
calls[testCaseName] = count + 1
callsMu.Unlock()
if count > 0 {
feedback.Printf("client sent another request (#%d) for the same test case", count+1)
}

if httpVersion, ok := enumValue("x-expect-http-version", req.Header, v1.HTTPVersion(0), feedback); ok {
checkHTTPVersion(httpVersion, req, feedback)
}
Expand Down
32 changes: 22 additions & 10 deletions internal/tracer/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,30 +48,42 @@ func newBuilder(req *http.Request, collector Collector) *builder {

// add adds the given event to the trace being built.
func (b *builder) add(event Event) {
var finish bool
defer func() {
// must not call b.build() below, while lock held,
// so we do so from deferred function, after lock released
if finish {
b.build()
}
}()
b.mu.Lock()
defer b.mu.Unlock()
if b.trace.TestName == "" {
return
}
switch event := event.(type) {
case *ResponseStart:
b.trace.Response = event.Response
case *ResponseError:
b.trace.Err = event.Err
case *RequestBodyData:
event.MessageIndex = b.reqCount
b.reqCount++
case *RequestBodyEnd:
if b.trace.Err != nil {
b.trace.Err = event.Err
}
case *ResponseStart:
b.trace.Response = event.Response
case *ResponseError:
b.trace.Err = event.Err
finish = true
case *ResponseBodyData:
event.MessageIndex = b.respCount
b.respCount++
case *ResponseBodyEnd:
if b.trace.Err != nil {
b.trace.Err = event.Err
}
case *RequestBodyData:
event.MessageIndex = b.reqCount
b.reqCount++
case *ResponseBodyData:
event.MessageIndex = b.respCount
b.respCount++
finish = true
case *RequestCanceled:
finish = true
}
event.setEventOffset(time.Since(b.start))
b.trace.Events = append(b.trace.Events, event)
Expand Down
43 changes: 33 additions & 10 deletions internal/tracer/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tracer

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
Expand All @@ -28,17 +29,22 @@ import (
func TracingRoundTripper(transport http.RoundTripper, collector Collector) http.RoundTripper {
return roundTripperFunc(func(req *http.Request) (*http.Response, error) {
builder := newBuilder(req, collector)
req = req.Clone(req.Context())
req.Body = newReader(req.Header, req.Body, true, builder)
ctx, cancel := context.WithCancel(req.Context())
go func() {
<-ctx.Done()
builder.add(&RequestCanceled{})
}()
req = req.Clone(ctx)
req.Body = newRequestReader(req.Header, req.Body, true, builder)
resp, err := transport.RoundTrip(req)
if err != nil {
builder.add(&ResponseError{Err: err})
builder.build()
cancel()
return nil, err
}
builder.add(&ResponseStart{Response: resp})
respClone := *resp
respClone.Body = newReader(resp.Header, resp.Body, false, builder)
respClone.Body = newReader(resp.Header, resp.Body, false, builder, cancel)
return &respClone, nil
})
}
Expand All @@ -47,21 +53,39 @@ func TracingRoundTripper(transport http.RoundTripper, collector Collector) http.
// handler will record traces of all operations to the given tracer.
func TracingHandler(handler http.Handler, collector Collector) http.Handler {
return http.HandlerFunc(func(respWriter http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
builder := newBuilder(req, collector)
req = req.Clone(req.Context())
req.Body = newReader(req.Header, req.Body, true, builder)
defer builder.build() // make sure the trace is complete before returning
go func() {
<-ctx.Done()
builder.add(&RequestCanceled{})
}()

req = req.Clone(ctx)
req.Body = newRequestReader(req.Header, req.Body, true, builder)
traceWriter := &tracingResponseWriter{
respWriter: respWriter,
req: req,
builder: builder,
}
defer func() {
var err error
panicVal := recover()
if panicVal != nil {
err = fmt.Errorf("panic: %v", panicVal)
}
traceWriter.tryFinish(err)
if panicVal != nil {
//nolint:forbidigo // just propagating existing panic
panic(panicVal)
}
}()

handler.ServeHTTP(
traceWriter,
req,
)

traceWriter.tryFinish(nil)
})
}

Expand Down Expand Up @@ -167,9 +191,8 @@ func (t *tracingResponseWriter) tryFinish(err error) {

t.finished = true
t.dataTracer.emitUnfinished()
t.builder.add(&ResponseBodyEnd{Err: err})
t.setTrailers()
t.builder.build()
t.builder.add(&ResponseBodyEnd{Err: err})
}

func (t *tracingResponseWriter) setTrailers() {
Expand Down
Loading

0 comments on commit e002318

Please sign in to comment.