From 2d08d190a95fa8727cedafefb0b1e5a86ef4c8b6 Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Wed, 23 Oct 2024 06:11:38 +1000 Subject: [PATCH] rollup-client: Increase call timeouts in CI (#12561) Refactors the op-service/client package a fair bit to ensure config options are more consistently applied. --- op-chain-ops/cmd/check-canyon/main.go | 2 +- op-e2e/system/e2esys/setup.go | 5 +- op-node/node/client.go | 4 +- op-node/node/server_test.go | 8 +- op-program/host/host.go | 4 +- op-service/client/lazy_dial.go | 28 +++---- op-service/client/lazy_dial_test.go | 2 +- op-service/client/rpc.go | 108 ++++++++++++++------------ op-service/dial/dial.go | 12 ++- 9 files changed, 97 insertions(+), 76 deletions(-) diff --git a/op-chain-ops/cmd/check-canyon/main.go b/op-chain-ops/cmd/check-canyon/main.go index deb5270ed639..d0f3bdb89d0e 100644 --- a/op-chain-ops/cmd/check-canyon/main.go +++ b/op-chain-ops/cmd/check-canyon/main.go @@ -232,7 +232,7 @@ func main() { // Parse the command-line arguments flag.Parse() - l2RPC, err := client.NewRPC(context.Background(), logger, rpcURL, client.WithDialBackoff(10)) + l2RPC, err := client.NewRPC(context.Background(), logger, rpcURL, client.WithDialAttempts(10)) if err != nil { log.Crit("Error creating RPC", "err", err) } diff --git a/op-e2e/system/e2esys/setup.go b/op-e2e/system/e2esys/setup.go index 17d973cb69b3..fc557aa1a932 100644 --- a/op-e2e/system/e2esys/setup.go +++ b/op-e2e/system/e2esys/setup.go @@ -1023,7 +1023,10 @@ func (sys *System) RollupClient(name string) *sources.RollupClient { require.NoError(sys.t, err, "failed to dial rollup instance %s", name) return cl }) - rollupClient = sources.NewRollupClient(client.NewBaseRPCClient(rpcClient)) + rollupClient = sources.NewRollupClient(client.NewBaseRPCClient(rpcClient, + // Increase timeouts because CI servers can be under a lot of load + client.WithCallTimeout(30*time.Second), + client.WithBatchCallTimeout(30*time.Second))) sys.rollupClients[name] = rollupClient return rollupClient } diff --git a/op-node/node/client.go b/op-node/node/client.go index a561a7678d24..796375a84365 100644 --- a/op-node/node/client.go +++ b/op-node/node/client.go @@ -66,7 +66,7 @@ func (cfg *L2EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCf auth := rpc.WithHTTPAuth(gn.NewJWTAuth(cfg.L2EngineJWTSecret)) opts := []client.RPCOption{ client.WithGethRPCOptions(auth), - client.WithDialBackoff(10), + client.WithDialAttempts(10), } l2Node, err := client.NewRPC(ctx, log, cfg.L2EngineAddr, opts...) if err != nil { @@ -140,7 +140,7 @@ func (cfg *L1EndpointConfig) Check() error { func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCfg *rollup.Config) (client.RPC, *sources.L1ClientConfig, error) { opts := []client.RPCOption{ client.WithHttpPollInterval(cfg.HttpPollInterval), - client.WithDialBackoff(10), + client.WithDialAttempts(10), } if cfg.RateLimit != 0 { opts = append(opts, client.WithRateLimit(cfg.RateLimit, cfg.BatchSize)) diff --git a/op-node/node/server_test.go b/op-node/node/server_test.go index f8722e272318..49697e52694e 100644 --- a/op-node/node/server_test.go +++ b/op-node/node/server_test.go @@ -109,7 +109,7 @@ func TestOutputAtBlock(t *testing.T) { require.NoError(t, server.Stop(context.Background())) }() - client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialBackoff(3)) + client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialAttempts(3)) require.NoError(t, err) var out *eth.OutputResponse @@ -145,7 +145,7 @@ func TestVersion(t *testing.T) { require.NoError(t, server.Stop(context.Background())) }() - client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialBackoff(3)) + client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialAttempts(3)) assert.NoError(t, err) var out string @@ -191,7 +191,7 @@ func TestSyncStatus(t *testing.T) { require.NoError(t, server.Stop(context.Background())) }() - client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialBackoff(3)) + client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialAttempts(3)) assert.NoError(t, err) var out *eth.SyncStatus @@ -234,7 +234,7 @@ func TestSafeHeadAtL1Block(t *testing.T) { require.NoError(t, server.Stop(context.Background())) }() - client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialBackoff(3)) + client, err := rpcclient.NewRPC(context.Background(), log, "http://"+server.Addr().String(), rpcclient.WithDialAttempts(3)) require.NoError(t, err) var out *eth.SafeHeadResponse diff --git a/op-program/host/host.go b/op-program/host/host.go index a60e451b972d..d12920275558 100644 --- a/op-program/host/host.go +++ b/op-program/host/host.go @@ -230,13 +230,13 @@ func makeDefaultPrefetcher(ctx context.Context, logger log.Logger, kv kvstore.KV return nil, nil } logger.Info("Connecting to L1 node", "l1", cfg.L1URL) - l1RPC, err := client.NewRPC(ctx, logger, cfg.L1URL, client.WithDialBackoff(10)) + l1RPC, err := client.NewRPC(ctx, logger, cfg.L1URL, client.WithDialAttempts(10)) if err != nil { return nil, fmt.Errorf("failed to setup L1 RPC: %w", err) } logger.Info("Connecting to L2 node", "l2", cfg.L2URL) - l2RPC, err := client.NewRPC(ctx, logger, cfg.L2URL, client.WithDialBackoff(10)) + l2RPC, err := client.NewRPC(ctx, logger, cfg.L2URL, client.WithDialAttempts(10)) if err != nil { return nil, fmt.Errorf("failed to setup L2 RPC: %w", err) } diff --git a/op-service/client/lazy_dial.go b/op-service/client/lazy_dial.go index 9064fbe1fe09..606bfac8f918 100644 --- a/op-service/client/lazy_dial.go +++ b/op-service/client/lazy_dial.go @@ -10,33 +10,33 @@ import ( "github.com/ethereum/go-ethereum/rpc" ) -// LazyRPC defers connection attempts to the usage of the RPC. +// lazyRPC defers connection attempts to the usage of the RPC. // This allows a websocket connection to be established lazily. // The underlying RPC should handle reconnects. -type LazyRPC struct { +type lazyRPC struct { // mutex to prevent more than one active dial attempt at a time. mu sync.Mutex // inner is the actual RPC client. // It is initialized once. The underlying RPC handles reconnections. inner RPC // options to initialize `inner` with. - opts []rpc.ClientOption + cfg rpcConfig endpoint string // If we have not initialized `inner` yet, // do not try to do so after closing the client. closed bool } -var _ RPC = (*LazyRPC)(nil) +var _ RPC = (*lazyRPC)(nil) -func NewLazyRPC(endpoint string, opts ...rpc.ClientOption) *LazyRPC { - return &LazyRPC{ - opts: opts, +func newLazyRPC(endpoint string, cfg rpcConfig) *lazyRPC { + return &lazyRPC{ + cfg: cfg, endpoint: endpoint, } } -func (l *LazyRPC) dial(ctx context.Context) error { +func (l *lazyRPC) dial(ctx context.Context) error { l.mu.Lock() defer l.mu.Unlock() if l.inner != nil { @@ -45,15 +45,15 @@ func (l *LazyRPC) dial(ctx context.Context) error { if l.closed { return errors.New("cannot dial RPC, client was already closed") } - underlying, err := rpc.DialOptions(ctx, l.endpoint, l.opts...) + underlying, err := rpc.DialOptions(ctx, l.endpoint, l.cfg.gethRPCOptions...) if err != nil { return fmt.Errorf("failed to dial: %w", err) } - l.inner = NewBaseRPCClient(underlying) + l.inner = wrapClient(underlying, l.cfg) return nil } -func (l *LazyRPC) Close() { +func (l *lazyRPC) Close() { l.mu.Lock() defer l.mu.Unlock() if l.inner != nil { @@ -62,21 +62,21 @@ func (l *LazyRPC) Close() { l.closed = true } -func (l *LazyRPC) CallContext(ctx context.Context, result any, method string, args ...any) error { +func (l *lazyRPC) CallContext(ctx context.Context, result any, method string, args ...any) error { if err := l.dial(ctx); err != nil { return err } return l.inner.CallContext(ctx, result, method, args...) } -func (l *LazyRPC) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error { +func (l *lazyRPC) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error { if err := l.dial(ctx); err != nil { return err } return l.inner.BatchCallContext(ctx, b) } -func (l *LazyRPC) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) { +func (l *lazyRPC) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) { if err := l.dial(ctx); err != nil { return nil, err } diff --git a/op-service/client/lazy_dial_test.go b/op-service/client/lazy_dial_test.go index 79608dc2b236..7af05416e5fb 100644 --- a/op-service/client/lazy_dial_test.go +++ b/op-service/client/lazy_dial_test.go @@ -28,7 +28,7 @@ func TestLazyRPC(t *testing.T) { addr := listener.Addr().String() - cl := NewLazyRPC("ws://" + addr) + cl := newLazyRPC("ws://"+addr, applyOptions(nil)) defer cl.Close() // At this point the connection is online, but the RPC is not. diff --git a/op-service/client/rpc.go b/op-service/client/rpc.go index 8fb2d4d37b88..c37a4a53dd0e 100644 --- a/op-service/client/rpc.go +++ b/op-service/client/rpc.go @@ -8,9 +8,8 @@ import ( "regexp" "time" - "golang.org/x/time/rate" - "github.com/prometheus/client_golang/prometheus" + "golang.org/x/time/rate" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/log" @@ -38,77 +37,92 @@ type rpcConfig struct { lazy bool callTimeout time.Duration batchCallTimeout time.Duration + fixedDialBackoff time.Duration } -type RPCOption func(cfg *rpcConfig) error +type RPCOption func(cfg *rpcConfig) func WithCallTimeout(d time.Duration) RPCOption { - return func(cfg *rpcConfig) error { + return func(cfg *rpcConfig) { cfg.callTimeout = d - return nil } } func WithBatchCallTimeout(d time.Duration) RPCOption { - return func(cfg *rpcConfig) error { + return func(cfg *rpcConfig) { cfg.batchCallTimeout = d - return nil } } -// WithDialBackoff configures the number of attempts for the initial dial to the RPC, -// attempts are executed with an exponential backoff strategy. -func WithDialBackoff(attempts int) RPCOption { - return func(cfg *rpcConfig) error { +// WithDialAttempts configures the number of attempts for the initial dial to the RPC, +// attempts are executed with an exponential backoff strategy by default. +func WithDialAttempts(attempts int) RPCOption { + return func(cfg *rpcConfig) { cfg.backoffAttempts = attempts - return nil + } +} + +// WithFixedDialBackoff makes the RPC client use a fixed delay between dial attempts of 2 seconds instead of exponential +func WithFixedDialBackoff(d time.Duration) RPCOption { + return func(cfg *rpcConfig) { + cfg.fixedDialBackoff = d } } // WithHttpPollInterval configures the RPC to poll at the given rate, in case RPC subscriptions are not available. func WithHttpPollInterval(duration time.Duration) RPCOption { - return func(cfg *rpcConfig) error { + return func(cfg *rpcConfig) { cfg.httpPollInterval = duration - return nil } } // WithGethRPCOptions passes the list of go-ethereum RPC options to the internal RPC instance. func WithGethRPCOptions(gethRPCOptions ...rpc.ClientOption) RPCOption { - return func(cfg *rpcConfig) error { + return func(cfg *rpcConfig) { cfg.gethRPCOptions = append(cfg.gethRPCOptions, gethRPCOptions...) - return nil } } // WithRateLimit configures the RPC to target the given rate limit (in requests / second). // See NewRateLimitingClient for more details. func WithRateLimit(rateLimit float64, burst int) RPCOption { - return func(cfg *rpcConfig) error { + return func(cfg *rpcConfig) { cfg.limit = rateLimit cfg.burst = burst - return nil } } // WithLazyDial makes the RPC client initialization defer the initial connection attempt, // and defer to later RPC requests upon subsequent dial errors. // Any dial-backoff option will be ignored if this option is used. -// This is implemented by wrapping the inner RPC client with a LazyRPC. func WithLazyDial() RPCOption { - return func(cfg *rpcConfig) error { + return func(cfg *rpcConfig) { cfg.lazy = true - return nil } } // NewRPC returns the correct client.RPC instance for a given RPC url. func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...RPCOption) (RPC, error) { - var cfg rpcConfig - for i, opt := range opts { - if err := opt(&cfg); err != nil { - return nil, fmt.Errorf("rpc option %d failed to apply to RPC config: %w", i, err) + cfg := applyOptions(opts) + + var wrapped RPC + if cfg.lazy { + wrapped = newLazyRPC(addr, cfg) + } else { + underlying, err := dialRPCClientWithBackoff(ctx, lgr, addr, cfg) + if err != nil { + return nil, err } + wrapped = wrapClient(underlying, cfg) + } + + return NewRPCWithClient(ctx, lgr, addr, wrapped, cfg.httpPollInterval) +} + +func applyOptions(opts []RPCOption) rpcConfig { + var cfg rpcConfig + for _, opt := range opts { + opt(&cfg) } if cfg.backoffAttempts < 1 { // default to at least 1 attempt, or it always fails to dial. @@ -120,23 +134,7 @@ func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...RPCOption) if cfg.batchCallTimeout == 0 { cfg.batchCallTimeout = 20 * time.Second } - - var wrapped RPC - if cfg.lazy { - wrapped = NewLazyRPC(addr, cfg.gethRPCOptions...) - } else { - underlying, err := dialRPCClientWithBackoff(ctx, lgr, addr, cfg.backoffAttempts, cfg.gethRPCOptions...) - if err != nil { - return nil, err - } - wrapped = &BaseRPCClient{c: underlying, callTimeout: cfg.callTimeout, batchCallTimeout: cfg.batchCallTimeout} - } - - if cfg.limit != 0 { - wrapped = NewRateLimitingClient(wrapped, rate.Limit(cfg.limit), cfg.burst) - } - - return NewRPCWithClient(ctx, lgr, addr, wrapped, cfg.httpPollInterval) + return cfg } // NewRPCWithClient builds a new polling client with the given underlying RPC client. @@ -148,14 +146,17 @@ func NewRPCWithClient(ctx context.Context, lgr log.Logger, addr string, underlyi } // Dials a JSON-RPC endpoint repeatedly, with a backoff, until a client connection is established. Auth is optional. -func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, attempts int, opts ...rpc.ClientOption) (*rpc.Client, error) { +func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, cfg rpcConfig) (*rpc.Client, error) { bOff := retry.Exponential() - return retry.Do(ctx, attempts, bOff, func() (*rpc.Client, error) { + if cfg.fixedDialBackoff != 0 { + bOff = retry.Fixed(cfg.fixedDialBackoff) + } + return retry.Do(ctx, cfg.backoffAttempts, bOff, func() (*rpc.Client, error) { if !IsURLAvailable(ctx, addr) { log.Warn("failed to dial address, but may connect later", "addr", addr) return nil, fmt.Errorf("address unavailable (%s)", addr) } - client, err := rpc.DialOptions(ctx, addr, opts...) + client, err := rpc.DialOptions(ctx, addr, cfg.gethRPCOptions...) if err != nil { return nil, fmt.Errorf("failed to dial address (%s): %w", addr, err) } @@ -191,15 +192,26 @@ func IsURLAvailable(ctx context.Context, address string) bool { // BaseRPCClient is a wrapper around a concrete *rpc.Client instance to make it compliant // with the client.RPC interface. -// It sets a timeout of 10s on CallContext & 20s on BatchCallContext made through it. +// It sets a default timeout of 10s on CallContext & 20s on BatchCallContext made through it. type BaseRPCClient struct { c *rpc.Client batchCallTimeout time.Duration callTimeout time.Duration } -func NewBaseRPCClient(c *rpc.Client) *BaseRPCClient { - return &BaseRPCClient{c: c, callTimeout: 10 * time.Second, batchCallTimeout: 20 * time.Second} +func NewBaseRPCClient(c *rpc.Client, opts ...RPCOption) RPC { + cfg := applyOptions(opts) + return wrapClient(c, cfg) +} + +func wrapClient(c *rpc.Client, cfg rpcConfig) RPC { + var wrapped RPC + wrapped = &BaseRPCClient{c: c, callTimeout: cfg.callTimeout, batchCallTimeout: cfg.batchCallTimeout} + + if cfg.limit != 0 { + wrapped = NewRateLimitingClient(wrapped, rate.Limit(cfg.limit), cfg.burst) + } + return wrapped } func (b *BaseRPCClient) Close() { diff --git a/op-service/dial/dial.go b/op-service/dial/dial.go index 4cf78f84fd21..ee7ca35e5882 100644 --- a/op-service/dial/dial.go +++ b/op-service/dial/dial.go @@ -35,16 +35,22 @@ func DialEthClientWithTimeout(ctx context.Context, timeout time.Duration, log lo // DialRollupClientWithTimeout attempts to dial the RPC provider using the provided URL. // If the dial doesn't complete within timeout seconds, this method will return an error. -func DialRollupClientWithTimeout(ctx context.Context, timeout time.Duration, log log.Logger, url string) (*sources.RollupClient, error) { +func DialRollupClientWithTimeout(ctx context.Context, timeout time.Duration, log log.Logger, url string, callerOpts ...client.RPCOption) (*sources.RollupClient, error) { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - rpcCl, err := dialRPCClientWithBackoff(ctx, log, url) + opts := []client.RPCOption{ + client.WithFixedDialBackoff(defaultRetryTime), + client.WithDialAttempts(defaultRetryCount), + } + opts = append(opts, callerOpts...) + + rpcCl, err := client.NewRPC(ctx, log, url, opts...) if err != nil { return nil, err } - return sources.NewRollupClient(client.NewBaseRPCClient(rpcCl)), nil + return sources.NewRollupClient(rpcCl), nil } // DialRPCClientWithTimeout attempts to dial the RPC provider using the provided URL.