Skip to content

Commit

Permalink
Original vs hedged request metrics (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
cristaloleg authored Aug 10, 2023
1 parent a52360e commit 6504b04
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 1 deletion.
3 changes: 2 additions & 1 deletion examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"math/rand"
"net/http"
"time"
Expand Down Expand Up @@ -57,7 +58,7 @@ func ExampleRoundTripper() {
go func() {
for {
time.Sleep(time.Second)
fmt.Printf("stats: %+v\n", stats)
fmt.Fprintf(io.Discard, "all requests: %d\n", stats.ActualRoundTrips())
}
}()

Expand Down
5 changes: 5 additions & 0 deletions hedged.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ func (ht *hedgedTransport) RoundTrip(req *http.Request) (*http.Response, error)
switch {
case resp.Resp != nil:
resultIdx = resp.Index
if resultIdx == 0 {
ht.metrics.originalRequestWinsInc()
} else {
ht.metrics.hedgedRequestWinsInc()
}
return resp.Resp, nil
case mainCtx.Err() != nil:
ht.metrics.canceledByUserRoundTripsInc()
Expand Down
122 changes: 122 additions & 0 deletions hedged_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,128 @@ func TestBestResponse(t *testing.T) {
}
}

func TestOriginalResponseWins(t *testing.T) {
const shortest = 20 * time.Millisecond
timeouts := [...]time.Duration{shortest, 30 * shortest, 5 * shortest, shortest, shortest, shortest}
timeoutCh := make(chan time.Duration, len(timeouts))
for _, t := range timeouts {
timeoutCh <- t
}

url := testServerURL(t, func(w http.ResponseWriter, r *http.Request) {
time.Sleep(<-timeoutCh)
})

start := time.Now()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody)
if err != nil {
t.Fatal(err)
}
client, metrics, err := hedgedhttp.NewClientAndStats(10*time.Millisecond, 5, nil)
if err != nil {
t.Fatalf("want nil, got %s", err)
}

checkAllMetricsAreZero(t, metrics)
resp, err := client.Do(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()

passed := time.Since(start)

if float64(passed) > float64(shortest)*2.5 {
t.Fatalf("want %v, got %v", shortest, passed)
}
if requestedRoundTrips := metrics.RequestedRoundTrips(); requestedRoundTrips != 1 {
t.Fatalf("Unnexpected requestedRoundTrips: %v", requestedRoundTrips)
}
if actualRoundTrips := metrics.ActualRoundTrips(); actualRoundTrips > 3 {
t.Fatalf("Unnexpected actualRoundTrips: %v", actualRoundTrips)
}
if originalRequestWins := metrics.OriginalRequestWins(); originalRequestWins != 1 {
t.Fatalf("Unnexpected actualRoundTrips: %v", originalRequestWins)
}
if hedgedRequestWins := metrics.HedgedRequestWins(); hedgedRequestWins != 0 {
t.Fatalf("Unnexpected actualRoundTrips: %v", hedgedRequestWins)
}
if failedRoundTrips := metrics.FailedRoundTrips(); failedRoundTrips != 0 {
t.Fatalf("Unnexpected failedRoundTrips: %v", failedRoundTrips)
}
if canceledByUserRoundTrips := metrics.CanceledByUserRoundTrips(); canceledByUserRoundTrips != 0 {
t.Fatalf("Unnexpected canceledByUserRoundTrips: %v", canceledByUserRoundTrips)
}
if canceledSubRequests := metrics.CanceledSubRequests(); canceledSubRequests > 4 {
t.Fatalf("Unnexpected canceledSubRequests: %v", canceledSubRequests)
}
}

func TestHedgedResponseWins(t *testing.T) {
const shortest = 20 * time.Millisecond
timeouts := [...]time.Duration{30 * shortest, 5 * shortest, shortest, shortest, shortest}
timeoutCh := make(chan time.Duration, len(timeouts))
for _, t := range timeouts {
timeoutCh <- t
}

url := testServerURL(t, func(w http.ResponseWriter, r *http.Request) {
time.Sleep(<-timeoutCh)
})

start := time.Now()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody)
if err != nil {
t.Fatal(err)
}
client, metrics, err := hedgedhttp.NewClientAndStats(10*time.Millisecond, 5, nil)
if err != nil {
t.Fatalf("want nil, got %s", err)
}

checkAllMetricsAreZero(t, metrics)
resp, err := client.Do(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()

passed := time.Since(start)

if float64(passed) > float64(shortest)*2.5 {
t.Fatalf("want %v, got %v", shortest, passed)
}
if requestedRoundTrips := metrics.RequestedRoundTrips(); requestedRoundTrips != 1 {
t.Fatalf("Unnexpected requestedRoundTrips: %v", requestedRoundTrips)
}
if actualRoundTrips := metrics.ActualRoundTrips(); actualRoundTrips != 5 {
t.Fatalf("Unnexpected actualRoundTrips: %v", actualRoundTrips)
}
if originalRequestWins := metrics.OriginalRequestWins(); originalRequestWins != 0 {
t.Fatalf("Unnexpected actualRoundTrips: %v", originalRequestWins)
}
if hedgedRequestWins := metrics.HedgedRequestWins(); hedgedRequestWins != 1 {
t.Fatalf("Unnexpected actualRoundTrips: %v", hedgedRequestWins)
}
if failedRoundTrips := metrics.FailedRoundTrips(); failedRoundTrips != 0 {
t.Fatalf("Unnexpected failedRoundTrips: %v", failedRoundTrips)
}
if canceledByUserRoundTrips := metrics.CanceledByUserRoundTrips(); canceledByUserRoundTrips != 0 {
t.Fatalf("Unnexpected canceledByUserRoundTrips: %v", canceledByUserRoundTrips)
}
if canceledSubRequests := metrics.CanceledSubRequests(); canceledSubRequests > 4 {
t.Fatalf("Unnexpected canceledSubRequests: %v", canceledSubRequests)
}
}

func TestGetSuccessEvenWithErrorsPresent(t *testing.T) {
var gotRequests uint64

Expand Down
14 changes: 14 additions & 0 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type Stats struct {
requestedRoundTrips atomicCounter
actualRoundTrips atomicCounter
failedRoundTrips atomicCounter
originalRequestWins atomicCounter
hedgedRequestWins atomicCounter
canceledByUserRoundTrips atomicCounter
canceledSubRequests atomicCounter
_ cacheLine
Expand All @@ -24,6 +26,8 @@ type Stats struct {
func (s *Stats) requestedRoundTripsInc() { atomic.AddUint64(&s.requestedRoundTrips.count, 1) }
func (s *Stats) actualRoundTripsInc() { atomic.AddUint64(&s.actualRoundTrips.count, 1) }
func (s *Stats) failedRoundTripsInc() { atomic.AddUint64(&s.failedRoundTrips.count, 1) }
func (s *Stats) originalRequestWinsInc() { atomic.AddUint64(&s.originalRequestWins.count, 1) }
func (s *Stats) hedgedRequestWinsInc() { atomic.AddUint64(&s.hedgedRequestWins.count, 1) }
func (s *Stats) canceledByUserRoundTripsInc() { atomic.AddUint64(&s.canceledByUserRoundTrips.count, 1) }
func (s *Stats) canceledSubRequestsInc() { atomic.AddUint64(&s.canceledSubRequests.count, 1) }

Expand All @@ -42,6 +46,16 @@ func (s *Stats) FailedRoundTrips() uint64 {
return atomic.LoadUint64(&s.failedRoundTrips.count)
}

// OriginalRequestWins returns count of original requests that were faster than the original.
func (s *Stats) OriginalRequestWins() uint64 {
return atomic.LoadUint64(&s.originalRequestWins.count)
}

// HedgedRequestWins returns count of hedged requests that were faster than the original.
func (s *Stats) HedgedRequestWins() uint64 {
return atomic.LoadUint64(&s.hedgedRequestWins.count)
}

// CanceledByUserRoundTrips returns count of requests that were canceled by user, using request context.
func (s *Stats) CanceledByUserRoundTrips() uint64 {
return atomic.LoadUint64(&s.canceledByUserRoundTrips.count)
Expand Down

0 comments on commit 6504b04

Please sign in to comment.