Skip to content

Commit

Permalink
feat: Fallback to reference rpcs and record error metric
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidNix authored May 12, 2023
2 parents 05ab298 + 14d4fbd commit d3dd907
Show file tree
Hide file tree
Showing 11 changed files with 431 additions and 96 deletions.
44 changes: 29 additions & 15 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,26 +59,18 @@ func Execute() {
// Register static metrics
registry.MustRegister(metrics.BuildStatic(cfg.Static.Gauges)...)

// Register reference rpc metrics
refMets := metrics.NewReferenceRPC()
registry.MustRegister(refMets.Metrics()...)

// Register cosmos chain metrics
cosmosMets := metrics.NewCosmos()
registry.MustRegister(cosmosMets.Metrics()...)

// Build all jobs
var jobs []metrics.Job

// Initialize Cosmos Rest jobs
// TODO(nix): Temporary. Will introduce fallback mechanism.
u, err := url.Parse(cfg.Cosmos[0].Rest[0].URL)
if err != nil {
panic(err)
}
// TODO(nix): Need different rest clients per chain. This hack prevents > 1 chain.
restClient := cosmos.NewRestClient(httpClient, *u)
restJobs := cosmos.BuildRestJobs(cosmosMets, restClient, cfg.Cosmos)
jobs = append(jobs, toJobs(restJobs)...)

// Initialize Cosmos validator jobs
valJobs := cosmos.BuildValidatorJobs(cosmosMets, restClient, cfg.Cosmos)
jobs = append(jobs, toJobs(valJobs)...)
cosmosJobs := buildCosmosJobs(cosmosMets, refMets, cfg)
jobs = append(jobs, cosmosJobs...)

// Configure error group with signal handling.
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
Expand Down Expand Up @@ -133,6 +125,28 @@ func logFatal(msg string, err error) {
os.Exit(1)
}

func buildCosmosJobs(cosmosMets *metrics.Cosmos, refMets *metrics.ReferenceRPC, cfg Config) (jobs []metrics.Job) {
// TODO(nix): Need different rest clients per chain. This hack prevents > 1 chain.
var urls []url.URL
for _, rest := range cfg.Cosmos[0].Rest {
u, err := url.Parse(rest.URL)
if err != nil {
logFatal("Failed to parse rest url", err)
}
urls = append(urls, *u)
}

const rpcType = "cosmos"
restClient := cosmos.NewRestClient(metrics.NewFallbackClient(httpClient, refMets, rpcType, urls))

restJobs := cosmos.BuildRestJobs(cosmosMets, restClient, cfg.Cosmos)
jobs = append(jobs, toJobs(restJobs)...)
valJobs := cosmos.BuildValidatorJobs(cosmosMets, restClient, cfg.Cosmos)
jobs = append(jobs, toJobs(valJobs)...)

return jobs
}

func toJobs[T metrics.Job](jobs []T) []metrics.Job {
result := make([]metrics.Job, len(jobs))
for i := range jobs {
Expand Down
3 changes: 2 additions & 1 deletion config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ static:
cosmos:
# The canonical chain id.
- chainID: cosmoshub-4
# Periodically polls REST API (aka LCD) for data such as block height. At least one REST url is required.
interval: 15s # Optional. How often to poll the REST API. Default is 15s.
# Periodically polls REST API (aka LCD) for data such as block height. At least one REST url is required.
# Order matters. The first url is used. If it fails, the next url is tried.
rest:
- url: https://api.cosmoshub.strange.love
- url: https://api-cosmoshub-ia.cosmosia.notional.ventures
Expand Down
24 changes: 8 additions & 16 deletions cosmos/rest_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,31 @@ package cosmos
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
)

type RestClient struct {
baseURL url.URL
httpDo func(req *http.Request) (*http.Response, error)
client HTTPClient
}

func NewRestClient(c *http.Client, baseURL url.URL) *RestClient {
type HTTPClient interface {
Get(ctx context.Context, path string) (*http.Response, error)
}

func NewRestClient(c HTTPClient) *RestClient {
return &RestClient{
baseURL: baseURL,
httpDo: c.Do,
client: c,
}
}

// response must be a pointer to a datatype (typically a struct)
func (c RestClient) get(ctx context.Context, url string, response any) error {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return fmt.Errorf("malformed request: %w", err)
}
req = req.WithContext(ctx)
resp, err := c.httpDo(req)
resp, err := c.client.Get(ctx, url)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return errors.New(resp.Status)
}
err = json.NewDecoder(resp.Body).Decode(response)
if err != nil {
return fmt.Errorf("malformed json: %w", err)
Expand Down
3 changes: 1 addition & 2 deletions cosmos/rest_latest_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ type Block struct {
// LatestBlock queries the latest block from the Cosmos REST API given the baseURL.
func (c RestClient) LatestBlock(ctx context.Context) (Block, error) {
var latestBlock Block
c.baseURL.Path = "/blocks/latest"
err := c.get(ctx, c.baseURL.String(), &latestBlock)
err := c.get(ctx, "/blocks/latest", &latestBlock)
return latestBlock, err
}
64 changes: 25 additions & 39 deletions cosmos/rest_latest_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"errors"
"io"
"net/http"
"net/url"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -16,63 +15,50 @@ import (
//go:embed testdata/block.json
var blockFixture []byte

type mockHTTPClient struct {
GetFn func(ctx context.Context, path string) (*http.Response, error)
}

func (m mockHTTPClient) Get(ctx context.Context, path string) (*http.Response, error) {
if m.GetFn != nil {
return m.GetFn(ctx, path)
}
return nil, nil
}

func TestClient_LatestBlock(t *testing.T) {
// Ensures we aren't comparing against context.Background().
type dummy string // Passes lint
ctx := context.WithValue(context.Background(), dummy("foo"), dummy("bar"))
t.Parallel()

baseURL, err := url.Parse("https://api.example.com:443")
require.NoError(t, err)
ctx := context.Background()

t.Run("happy path", func(t *testing.T) {
client := NewRestClient(http.DefaultClient, *baseURL)
require.NotNil(t, client.httpDo)

client.httpDo = func(req *http.Request) (*http.Response, error) {
require.Same(t, ctx, req.Context())
require.Equal(t, "GET", req.Method)
require.Equal(t, "https://api.example.com:443/blocks/latest", req.URL.String())
var httpClient mockHTTPClient
httpClient.GetFn = func(ctx context.Context, path string) (*http.Response, error) {
require.NotNil(t, ctx)
require.Equal(t, "/blocks/latest", path)

return &http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewReader(blockFixture)),
}, nil
}

client := NewRestClient(httpClient)
got, err := client.LatestBlock(ctx)
require.NoError(t, err)

require.NoError(t, err)
require.Equal(t, "15226219", got.Block.Header.Height)
})

t.Run("http error", func(t *testing.T) {
client := NewRestClient(http.DefaultClient, *baseURL)

client.httpDo = func(req *http.Request) (*http.Response, error) {
return nil, errors.New("http error")
}

_, err := client.LatestBlock(ctx)

require.Error(t, err)
require.EqualError(t, err, "http error")
})

t.Run("bad status code", func(t *testing.T) {
client := NewRestClient(http.DefaultClient, *baseURL)
require.NotNil(t, client.httpDo)

client.httpDo = func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: 500,
Status: "internal server error",
Body: io.NopCloser(bytes.NewReader(nil)),
}, nil
t.Run("error", func(t *testing.T) {
var httpClient mockHTTPClient
httpClient.GetFn = func(ctx context.Context, path string) (*http.Response, error) {
return nil, errors.New("boom")
}
client := NewRestClient(&httpClient)

_, err := client.LatestBlock(ctx)

require.Error(t, err)
require.EqualError(t, err, "internal server error")
require.EqualError(t, err, "boom")
})
}
4 changes: 2 additions & 2 deletions cosmos/rest_signing_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type SigningStatus struct {

// SigningStatus returns the signing status of a validator given the consensus address.
func (c RestClient) SigningStatus(ctx context.Context, consaddress string) (SigningStatus, error) {
c.baseURL.Path = path.Join("/cosmos/slashing/v1beta1/signing_infos", consaddress)
p := path.Join("/cosmos/slashing/v1beta1/signing_infos", consaddress)
var status SigningStatus
err := c.get(ctx, c.baseURL.String(), &status)
err := c.get(ctx, p, &status)
return status, err
}
37 changes: 16 additions & 21 deletions cosmos/rest_signing_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"io"
"net/http"
"net/url"
"strings"
"testing"
"time"
Expand All @@ -15,31 +14,27 @@ import (
func TestRestClient_SigningStatus(t *testing.T) {
t.Parallel()

baseURL, err := url.Parse("https://api.example.com")
require.NoError(t, err)

client := NewRestClient(nil, *baseURL)

client.httpDo = func(req *http.Request) (*http.Response, error) {
require.Equal(t, "GET", req.Method)
require.Equal(t, "https://api.example.com/cosmos/slashing/v1beta1/signing_infos/cosmosvalcons123", req.URL.String())

const response = `{
"val_signing_info": {
"address": "",
"start_height": "0",
"index_offset": "6958718",
"jailed_until": "2021-11-07T03:19:15.865885008Z",
"tombstoned": true,
"missed_blocks_counter": "9"
}
var httpClient mockHTTPClient
httpClient.GetFn = func(ctx context.Context, path string) (*http.Response, error) {
require.NotNil(t, ctx)
require.Equal(t, "/cosmos/slashing/v1beta1/signing_infos/cosmosvalcons123", path)

const fixture = `{
"val_signing_info": {
"address": "",
"start_height": "0",
"index_offset": "6958718",
"jailed_until": "2021-11-07T03:19:15.865885008Z",
"tombstoned": true,
"missed_blocks_counter": "9"
}
}`
return &http.Response{
StatusCode: 200,
Body: io.NopCloser(strings.NewReader(response)),
Body: io.NopCloser(strings.NewReader(fixture)),
}, nil
}

client := NewRestClient(httpClient)
got, err := client.SigningStatus(context.Background(), "cosmosvalcons123")
require.NoError(t, err)

Expand Down
89 changes: 89 additions & 0 deletions metrics/fallback_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package metrics

import (
"context"
"errors"
"fmt"
"net/http"
"net/url"
"strconv"

"golang.org/x/exp/slog"
)

type FallbackClient struct {
hosts []url.URL
httpDo func(req *http.Request) (*http.Response, error)
log *slog.Logger
metrics ClientMetrics
rpcType string
}

type ClientMetrics interface {
IncClientError(rpcType string, host url.URL, reason string)
// TODO(nix): Metrics for request counts. Latency histogram.
}

func NewFallbackClient(client *http.Client, metrics ClientMetrics, rpcType string, hosts []url.URL) *FallbackClient {
if len(hosts) == 0 {
panic("no hosts provided")
}
return &FallbackClient{
hosts: hosts,
httpDo: client.Do,
log: slog.Default(),
metrics: metrics,
rpcType: rpcType,
}
}

const unknownErrReason = "unknown"

func (c FallbackClient) Get(ctx context.Context, path string) (*http.Response, error) {
doGet := func(host url.URL) (*http.Response, error) {
log := c.log.With("host", host.Hostname(), "path", path, "rpc", c.rpcType)
host.Path = path
req, err := http.NewRequestWithContext(ctx, http.MethodGet, host.String(), nil)
if err != nil {
log.Error("Failed to create request", "error", err)
c.recordErrMetric(host, err)
return nil, err
}
resp, err := c.httpDo(req)
if err != nil {
log.Error("Failed to send request", "error", err)
c.recordErrMetric(host, err)
return nil, err
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
_ = resp.Body.Close()
log.Error("Response returned bad status code", "status", resp.StatusCode)
c.metrics.IncClientError(c.rpcType, host, strconv.Itoa(resp.StatusCode))
return nil, fmt.Errorf("%s: bad status code %d", req.URL, resp.StatusCode)
}
return resp, nil
}

var lastErr error
for _, host := range c.hosts {
resp, err := doGet(host)
if err != nil {
lastErr = err
continue
}
return resp, nil
}
return nil, lastErr
}

func (c FallbackClient) recordErrMetric(host url.URL, err error) {
reason := unknownErrReason
switch {
case errors.Is(err, context.DeadlineExceeded):
reason = "timeout"
case errors.Is(err, context.Canceled):
// Do not record when the process is shutting down.
return
}
c.metrics.IncClientError(c.rpcType, host, reason)
}
Loading

0 comments on commit d3dd907

Please sign in to comment.