Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handling connection fault on wsrpc LatestReport #13626

Merged
5 changes: 5 additions & 0 deletions .changeset/fast-dolphins-cry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

handle connection timeout on cache path for ws client LatestReport #bugfix
7 changes: 3 additions & 4 deletions core/services/relay/evm/mercury/wsrpc/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,8 @@ type Fetcher interface {
}

type Client interface {
Fetcher
ServerURL() string
RawClient() pb.MercuryClient
RawLatestReport(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error)
}

// Cache is scoped to one particular mercury server
Expand Down Expand Up @@ -194,7 +193,7 @@ func (m *memCache) LatestReport(ctx context.Context, req *pb.LatestReportRequest
}
feedIDHex := mercuryutils.BytesToFeedID(req.FeedId).String()
if m.cfg.LatestReportTTL <= 0 {
return m.client.RawClient().LatestReport(ctx, req)
return m.client.RawLatestReport(ctx, req)
}
vi, loaded := m.cache.LoadOrStore(feedIDHex, &cacheVal{
sync.RWMutex{},
Expand Down Expand Up @@ -311,7 +310,7 @@ func (m *memCache) fetch(req *pb.LatestReportRequest, v *cacheVal) {
// NOTE: must drop down to RawClient here otherwise we enter an
// infinite loop of calling a client that calls back to this same cache
// and on and on
val, err = m.client.RawClient().LatestReport(ctx, req)
val, err = m.client.RawLatestReport(ctx, req)
cancel()
v.setError(err)
if memcacheCtx.Err() != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type mockClient struct {
err error
}

func (m *mockClient) LatestReport(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error) {
func (m *mockClient) RawLatestReport(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error) {
return m.resp, m.err
}

Expand Down
10 changes: 8 additions & 2 deletions core/services/relay/evm/mercury/wsrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,13 @@ func (w *client) handleTimeout(err error) {
} else {
w.consecutiveTimeoutCnt.Store(0)
}

}

func (w *client) RawLatestReport(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error) {
resp, err = w.rawClient.LatestReport(ctx, req)
w.handleTimeout(err)
return
}

func (w *client) LatestReport(ctx context.Context, req *pb.LatestReportRequest) (resp *pb.LatestReportResponse, err error) {
Expand All @@ -312,8 +319,7 @@ func (w *client) LatestReport(ctx context.Context, req *pb.LatestReportRequest)
}
var cached bool
if w.cache == nil {
resp, err = w.rawClient.LatestReport(ctx, req)
w.handleTimeout(err)
resp, err = w.RawLatestReport(ctx, req)
} else {
cached = true
resp, err = w.cache.LatestReport(ctx, req)
Expand Down
62 changes: 62 additions & 0 deletions core/services/relay/evm/mercury/wsrpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,65 @@ func Test_Client_LatestReport(t *testing.T) {
})
}
}

func Test_Client_RawLatestReport(t *testing.T) {
lggr := logger.TestLogger(t)
ctx := testutils.Context(t)

t.Run("sends on reset channel after MaxConsecutiveRequestFailures timed out transmits", func(t *testing.T) {
noopCacheSet := newNoopCacheSet()
req := &pb.LatestReportRequest{}
calls := 0
timeoutErr := context.DeadlineExceeded
wsrpcClient := &mocks.MockWSRPCClient{
LatestReportF: func(ctx context.Context, in *pb.LatestReportRequest) (*pb.LatestReportResponse, error) {
calls++
return nil, timeoutErr
},
}
conn := &mocks.MockConn{
Ready: true,
}

c := newClient(lggr, csakey.KeyV2{}, nil, "", noopCacheSet)
c.conn = conn
c.rawClient = wsrpcClient
require.NoError(t, c.StartOnce("Mock WSRPC Client", func() error { return nil }))
for i := 1; i < MaxConsecutiveRequestFailures; i++ {
_, err := c.RawLatestReport(ctx, req)
require.EqualError(t, err, "context deadline exceeded")
}
assert.Equal(t, MaxConsecutiveRequestFailures-1, calls)
select {
case <-c.chResetTransport:
t.Fatal("unexpected send on chResetTransport")
default:
}
_, err := c.RawLatestReport(ctx, req)
require.EqualError(t, err, "context deadline exceeded")
assert.Equal(t, MaxConsecutiveRequestFailures, calls)
select {
case <-c.chResetTransport:
default:
t.Fatal("expected send on chResetTransport")
}

t.Run("successful LatestReport resets the counter", func(t *testing.T) {
timeoutErr = nil
// working LatestReport to reset counter
_, err := c.RawLatestReport(ctx, req)
require.NoError(t, err)
assert.Equal(t, MaxConsecutiveRequestFailures+1, calls)
assert.Equal(t, 0, int(c.consecutiveTimeoutCnt.Load()))
})

t.Run("doesn't block in case channel is full", func(t *testing.T) {
timeoutErr = context.DeadlineExceeded
c.chResetTransport = nil // simulate full channel
for i := 0; i < MaxConsecutiveRequestFailures; i++ {
_, err := c.RawLatestReport(ctx, req)
require.EqualError(t, err, "context deadline exceeded")
}
})
})
}
Loading