From 0fee34b645ca9dff91eb2d992bb62ec458408538 Mon Sep 17 00:00:00 2001 From: Sam Stokes <35908605+bitwiseguy@users.noreply.github.com> Date: Tue, 24 Sep 2024 08:50:57 -0400 Subject: [PATCH] op-service: add configurable client timeout (#12074) * op-service: add configurable timeouts * op-service: fix lazy_dial --- op-service/client/lazy_dial.go | 3 ++- op-service/client/rpc.go | 34 ++++++++++++++++++++++++----- op-service/sources/engine_client.go | 27 ++++++++++++++++------- 3 files changed, 50 insertions(+), 14 deletions(-) diff --git a/op-service/client/lazy_dial.go b/op-service/client/lazy_dial.go index f5872778da9e..9a4e7cf6d872 100644 --- a/op-service/client/lazy_dial.go +++ b/op-service/client/lazy_dial.go @@ -49,7 +49,7 @@ func (l *LazyRPC) dial(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to dial: %w", err) } - l.inner = &BaseRPCClient{c: underlying} + l.inner = NewBaseRPCClient(underlying) return nil } @@ -66,6 +66,7 @@ func (l *LazyRPC) CallContext(ctx context.Context, result any, method string, ar if err := l.dial(ctx); err != nil { return err } + fmt.Println("checkpoin 1") return l.inner.CallContext(ctx, result, method, args...) } diff --git a/op-service/client/rpc.go b/op-service/client/rpc.go index f1452c57f0db..8fb2d4d37b88 100644 --- a/op-service/client/rpc.go +++ b/op-service/client/rpc.go @@ -36,10 +36,26 @@ type rpcConfig struct { limit float64 burst int lazy bool + callTimeout time.Duration + batchCallTimeout time.Duration } type RPCOption func(cfg *rpcConfig) error +func WithCallTimeout(d time.Duration) RPCOption { + return func(cfg *rpcConfig) error { + cfg.callTimeout = d + return nil + } +} + +func WithBatchCallTimeout(d time.Duration) RPCOption { + return func(cfg *rpcConfig) error { + cfg.batchCallTimeout = d + return nil + } +} + // WithDialBackoff configures the number of attempts for the initial dial to the RPC, // attempts are executed with an exponential backoff strategy. func WithDialBackoff(attempts int) RPCOption { @@ -98,6 +114,12 @@ func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...RPCOption) if cfg.backoffAttempts < 1 { // default to at least 1 attempt, or it always fails to dial. cfg.backoffAttempts = 1 } + if cfg.callTimeout == 0 { + cfg.callTimeout = 10 * time.Second + } + if cfg.batchCallTimeout == 0 { + cfg.batchCallTimeout = 20 * time.Second + } var wrapped RPC if cfg.lazy { @@ -107,7 +129,7 @@ func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...RPCOption) if err != nil { return nil, err } - wrapped = &BaseRPCClient{c: underlying} + wrapped = &BaseRPCClient{c: underlying, callTimeout: cfg.callTimeout, batchCallTimeout: cfg.batchCallTimeout} } if cfg.limit != 0 { @@ -171,11 +193,13 @@ func IsURLAvailable(ctx context.Context, address string) bool { // with the client.RPC interface. // It sets a timeout of 10s on CallContext & 20s on BatchCallContext made through it. type BaseRPCClient struct { - c *rpc.Client + c *rpc.Client + batchCallTimeout time.Duration + callTimeout time.Duration } func NewBaseRPCClient(c *rpc.Client) *BaseRPCClient { - return &BaseRPCClient{c: c} + return &BaseRPCClient{c: c, callTimeout: 10 * time.Second, batchCallTimeout: 20 * time.Second} } func (b *BaseRPCClient) Close() { @@ -183,13 +207,13 @@ func (b *BaseRPCClient) Close() { } func (b *BaseRPCClient) CallContext(ctx context.Context, result any, method string, args ...any) error { - cCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + cCtx, cancel := context.WithTimeout(ctx, b.callTimeout) defer cancel() return b.c.CallContext(cCtx, result, method, args...) } func (b *BaseRPCClient) BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error { - cCtx, cancel := context.WithTimeout(ctx, 20*time.Second) + cCtx, cancel := context.WithTimeout(ctx, b.batchCallTimeout) defer cancel() return b.c.BatchCallContext(cCtx, batch) } diff --git a/op-service/sources/engine_client.go b/op-service/sources/engine_client.go index 07d7ecb9d271..79921192f9e7 100644 --- a/op-service/sources/engine_client.go +++ b/op-service/sources/engine_client.go @@ -50,9 +50,10 @@ func NewEngineClient(client client.RPC, log log.Logger, metrics caching.Metrics, // EngineAPIClient is an RPC client for the Engine API functions. type EngineAPIClient struct { - RPC client.RPC - log log.Logger - evp EngineVersionProvider + RPC client.RPC + log log.Logger + evp EngineVersionProvider + timeout time.Duration } type EngineVersionProvider interface { @@ -63,9 +64,19 @@ type EngineVersionProvider interface { func NewEngineAPIClient(rpc client.RPC, l log.Logger, evp EngineVersionProvider) *EngineAPIClient { return &EngineAPIClient{ - RPC: rpc, - log: l, - evp: evp, + RPC: rpc, + log: l, + evp: evp, + timeout: time.Second * 5, + } +} + +func NewEngineAPIClientWithTimeout(rpc client.RPC, l log.Logger, evp EngineVersionProvider, timeout time.Duration) *EngineAPIClient { + return &EngineAPIClient{ + RPC: rpc, + log: l, + evp: evp, + timeout: timeout, } } @@ -84,7 +95,7 @@ func (s *EngineAPIClient) ForkchoiceUpdate(ctx context.Context, fc *eth.Forkchoi llog := s.log.New("state", fc) // local logger tlog := llog.New("attr", attributes) // trace logger tlog.Trace("Sharing forkchoice-updated signal") - fcCtx, cancel := context.WithTimeout(ctx, time.Second*5) + fcCtx, cancel := context.WithTimeout(ctx, s.timeout) defer cancel() var result eth.ForkchoiceUpdatedResult method := s.evp.ForkchoiceUpdatedVersion(attributes) @@ -120,7 +131,7 @@ func (s *EngineAPIClient) NewPayload(ctx context.Context, payload *eth.Execution e := s.log.New("block_hash", payload.BlockHash) e.Trace("sending payload for execution") - execCtx, cancel := context.WithTimeout(ctx, time.Second*5) + execCtx, cancel := context.WithTimeout(ctx, s.timeout) defer cancel() var result eth.PayloadStatusV1