Skip to content

Commit

Permalink
Merge pull request #158 from kaleido-io/refactor-retry-config
Browse files Browse the repository at this point in the history
Restructure block listener retry configuration
  • Loading branch information
EnriqueL8 authored Dec 11, 2024
2 parents aa3efed + 3d2c95e commit c80b0f3
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 36 deletions.
16 changes: 12 additions & 4 deletions config.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
|connectionTimeout|The maximum amount of time that a connection is allowed to remain with no data transmitted|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
|dataFormat|Configure the JSON data format for query output and events|map,flat_array,self_describing|`map`
|expectContinueTimeout|See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s`
|gasEstimationFactor|The factor to apply to the gas estimation to determine the gas limit|float|`1.5`
|gasEstimationFactor|The factor to apply to the gas estimation to determine the gas limit|`float32`|`1.5`
|headers|Adds custom headers to HTTP requests|`map[string]string`|`<nil>`
|hederaCompatibilityMode|Compatibility mode for Hedera, allowing non-standard block header hashes to be processed|`boolean`|`false`
|idleTimeout|The max duration to hold a HTTP keepalive connection between calls|[`time.Duration`](https://pkg.go.dev/time#Duration)|`475ms`
Expand Down Expand Up @@ -108,17 +108,25 @@
|---|-----------|----|-------------|
|url|Optional HTTP proxy server to connect through|`string`|`<nil>`

## connector.queryLoopRetry

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|factor|Factor to increase the delay by, between each query request retry to the RPC endpoint, applicable to all the query loops|`float32`|`2`
|initialDelay|Initial delay for retrying query requests to the RPC endpoint, applicable to all the query loops|[`time.Duration`](https://pkg.go.dev/time#Duration)|`100ms`
|maxDelay|Maximum delay for between each query request retry to the RPC endpoint, applicable to all the query loops|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`

## connector.retry

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|count|The maximum number of times to retry|`int`|`5`
|enabled|Enables retries|`boolean`|`false`
|errorStatusCodeRegex|The regex that the error response status code must match to trigger retry|`string`|`<nil>`
|factor|The retry backoff factor|`float32`|`2`
|factor|<Deprecated> Please refer to 'connector.queryLoopRetry.factor' to understand its original purpose and use that instead|`float32`|`<nil>`
|initWaitTime|The initial retry delay|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms`
|initialDelay|The initial retry delay|[`time.Duration`](https://pkg.go.dev/time#Duration)|`100ms`
|maxDelay|The maximum retry delay|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
|initialDelay|<Deprecated> Please refer to 'connector.queryLoopRetry.initialDelay' to understand its original purpose and use that instead|[`time.Duration`](https://pkg.go.dev/time#Duration)|`<nil>`
|maxDelay|<Deprecated> Please refer to 'connector.queryLoopRetry.maxDelay' to understand its original purpose and use that instead|[`time.Duration`](https://pkg.go.dev/time#Duration)|`<nil>`
|maxWaitTime|The maximum retry delay|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`

## connector.throttle
Expand Down
24 changes: 16 additions & 8 deletions internal/ethereum/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,19 @@ const (
EventsCheckpointBlockGap = "events.checkpointBlockGap"
EventsBlockTimestamps = "events.blockTimestamps"
EventsFilterPollingInterval = "events.filterPollingInterval"
RetryInitDelay = "retry.initialDelay"
RetryMaxDelay = "retry.maxDelay"
RetryFactor = "retry.factor"
MaxConcurrentRequests = "maxConcurrentRequests"
TxCacheSize = "txCacheSize"
HederaCompatibilityMode = "hederaCompatibilityMode"
TraceTXForRevertReason = "traceTXForRevertReason"
WebSocketsEnabled = "ws.enabled"
RetryInitDelay = "queryLoopRetry.initialDelay"
RetryMaxDelay = "queryLoopRetry.maxDelay"
RetryFactor = "queryLoopRetry.factor"

DeprecatedRetryInitDelay = "retry.initialDelay"
DeprecatedRetryMaxDelay = "retry.maxDelay"
DeprecatedRetryFactor = "retry.factor"

MaxConcurrentRequests = "maxConcurrentRequests"
TxCacheSize = "txCacheSize"
HederaCompatibilityMode = "hederaCompatibilityMode"
TraceTXForRevertReason = "traceTXForRevertReason"
WebSocketsEnabled = "ws.enabled"
)

const (
Expand Down Expand Up @@ -72,6 +77,9 @@ func InitConfig(conf config.Section) {
conf.AddKnownKey(RetryFactor, DefaultRetryDelayFactor)
conf.AddKnownKey(RetryInitDelay, DefaultRetryInitDelay)
conf.AddKnownKey(RetryMaxDelay, DefaultRetryMaxDelay)
conf.AddKnownKey(DeprecatedRetryFactor)
conf.AddKnownKey(DeprecatedRetryInitDelay)
conf.AddKnownKey(DeprecatedRetryMaxDelay)
conf.AddKnownKey(MaxConcurrentRequests, 50)
conf.AddKnownKey(TxCacheSize, 250)
conf.AddKnownKey(HederaCompatibilityMode, false)
Expand Down
25 changes: 20 additions & 5 deletions internal/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,27 @@ func NewEthereumConnector(ctx context.Context, conf config.Section) (cc ffcapi.A
eventBlockTimestamps: conf.GetBool(EventsBlockTimestamps),
eventFilterPollingInterval: conf.GetDuration(EventsFilterPollingInterval),
traceTXForRevertReason: conf.GetBool(TraceTXForRevertReason),
retry: &retry.Retry{
InitialDelay: conf.GetDuration(RetryInitDelay),
MaximumDelay: conf.GetDuration(RetryMaxDelay),
Factor: conf.GetFloat64(RetryFactor),
},
retry: &retry.Retry{},
}

if !conf.IsSet(DeprecatedRetryInitDelay) || (conf.IsSet(DeprecatedRetryInitDelay) && conf.IsSet(RetryInitDelay)) {
c.retry.InitialDelay = conf.GetDuration(RetryInitDelay)
} else {
c.retry.InitialDelay = conf.GetDuration(DeprecatedRetryInitDelay)
}

if !conf.IsSet(DeprecatedRetryFactor) || (conf.IsSet(DeprecatedRetryFactor) && conf.IsSet(RetryFactor)) {
c.retry.Factor = conf.GetFloat64(RetryFactor)
} else {
c.retry.Factor = conf.GetFloat64(DeprecatedRetryFactor)
}

if !conf.IsSet(DeprecatedRetryMaxDelay) || (conf.IsSet(DeprecatedRetryMaxDelay) && conf.IsSet(RetryMaxDelay)) {
c.retry.MaximumDelay = conf.GetDuration(RetryMaxDelay)
} else {
c.retry.MaximumDelay = conf.GetDuration(DeprecatedRetryMaxDelay)
}

if c.catchupThreshold < c.catchupPageSize {
log.L(ctx).Warnf("Catchup threshold %d must be at least as large as the catchup page size %d (overridden to %d)", c.catchupThreshold, c.catchupPageSize, c.catchupPageSize)
c.catchupThreshold = c.catchupPageSize
Expand Down
40 changes: 40 additions & 0 deletions internal/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package ethereum
import (
"context"
"testing"
"time"

"github.com/hyperledger/firefly-common/pkg/config"
"github.com/hyperledger/firefly-common/pkg/ffresty"
Expand Down Expand Up @@ -153,3 +154,42 @@ func TestConnectorInit(t *testing.T) {
cc, err = NewEthereumConnector(context.Background(), conf)
assert.Regexp(t, "FF23051", err)
}

// TODO: remove once deprecated fields are removed
func TestNewEthereumConnectorConfigDeprecates(t *testing.T) {
// Test deprecated fields
config.RootConfigReset()
conf := config.RootSection("unittest")
InitConfig(conf)
conf.Set(ffresty.HTTPConfigURL, "http://localhost:8545")

// check deprecates
conf.Set(DeprecatedRetryInitDelay, "100ms")
conf.Set(DeprecatedRetryFactor, 2.0)
conf.Set(DeprecatedRetryMaxDelay, "30s")
cc, err := NewEthereumConnector(context.Background(), conf)
assert.NoError(t, err)
assert.NotNil(t, cc)
assert.Equal(t, 100*time.Millisecond, cc.(*ethConnector).retry.InitialDelay)
assert.Equal(t, 2.0, cc.(*ethConnector).retry.Factor)
assert.Equal(t, 30*time.Second, cc.(*ethConnector).retry.MaximumDelay)
}

func TestNewEthereumConnectorConfig(t *testing.T) {
// Test deprecated fields
config.RootConfigReset()
conf := config.RootSection("unittest")
InitConfig(conf)
conf.Set(ffresty.HTTPConfigURL, "http://localhost:8545")

// check new values set
conf.Set(RetryInitDelay, "10s")
conf.Set(RetryFactor, 4.0)
conf.Set(RetryMaxDelay, "30s")
cc, err := NewEthereumConnector(context.Background(), conf)
assert.NoError(t, err)
assert.NotNil(t, cc)
assert.Equal(t, 10*time.Second, cc.(*ethConnector).retry.InitialDelay)
assert.Equal(t, 4.0, cc.(*ethConnector).retry.Factor)
assert.Equal(t, 30*time.Second, cc.(*ethConnector).retry.MaximumDelay)
}
6 changes: 3 additions & 3 deletions internal/ethereum/event_enricher.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ func (ee *eventEnricher) filterEnrichEthLog(ctx context.Context, f *eventFilter,
Signature: signature,
BlockHash: ethLog.BlockHash.String(),
TransactionHash: ethLog.TransactionHash.String(),
BlockNumber: fftypes.FFuint64(blockNumber),
TransactionIndex: fftypes.FFuint64(transactionIndex),
LogIndex: fftypes.FFuint64(logIndex),
BlockNumber: fftypes.FFuint64(ethLog.BlockNumber.BigInt().Uint64()),
TransactionIndex: fftypes.FFuint64(ethLog.TransactionIndex.BigInt().Uint64()),
LogIndex: fftypes.FFuint64(ethLog.LogIndex.BigInt().Uint64()),
Timestamp: timestamp,
},
Info: &info,
Expand Down
38 changes: 22 additions & 16 deletions internal/msgs/en_config_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,26 @@ var ffc = func(key, translation string, fieldType string) i18n.ConfigMessageKey

//revive:disable
var (
ConfigEthereumURL = ffc("config.connector.url", "URL of JSON/RPC endpoint for the Ethereum node/gateway", "string")
ConfigEthereumWSEnabled = ffc("config.connector.ws.enabled", "When true a WebSocket is established for block listening, in addition to the HTTP RPC connections used for other functions", i18n.BooleanType)
ConfigEthereumDataFormat = ffc("config.connector.dataFormat", "Configure the JSON data format for query output and events", "map,flat_array,self_describing")
ConfigEthereumGasEstimationFactor = ffc("config.connector.gasEstimationFactor", "The factor to apply to the gas estimation to determine the gas limit", "float")
ConfigBlockCacheSize = ffc("config.connector.blockCacheSize", "Maximum of blocks to hold in the block info cache", i18n.IntType)
ConfigBlockPollingInterval = ffc("config.connector.blockPollingInterval", "Interval for polling to check for new blocks", i18n.TimeDurationType)
ConfigEventsBlockTimestamps = ffc("config.connector.events.blockTimestamps", "Whether to include the block timestamps in the event information", i18n.BooleanType)
ConfigEventsCatchupPageSize = ffc("config.connector.events.catchupPageSize", "Number of blocks to query per poll when catching up to the head of the blockchain", i18n.IntType)
ConfigEventsCatchupThreshold = ffc("config.connector.events.catchupThreshold", "How many blocks behind the chain head an event stream or listener must be on startup, to enter catchup mode", i18n.IntType)
ConfigEventsCatchupDownscaleRegex = ffc("config.connector.events.catchupDownscaleRegex", "An error pattern to check for from JSON/RPC providers if they limit response sizes to eth_getLogs(). If an error is returned from eth_getLogs() and that error matches the configured pattern, the number of logs requested (catchupPageSize) will be reduced automatically.", "string")
ConfigEventsCheckpointBlockGap = ffc("config.connector.events.checkpointBlockGap", "The number of blocks at the head of the chain that should be considered unstable (could be dropped from the canonical chain after a re-org). Unless events with a full set of confirmations are detected, the restart checkpoint will this many blocks behind the chain head.", i18n.IntType)
ConfigEventsFilterPollingInterval = ffc("config.connector.events.filterPollingInterval", "The interval between polling calls to a filter, when checking for newly arrived events", i18n.TimeDurationType)
ConfigTxCacheSize = ffc("config.connector.txCacheSize", "Maximum of transactions to hold in the transaction info cache", i18n.IntType)
ConfigMaxConcurrentRequests = ffc("config.connector.maxConcurrentRequests", "Maximum of concurrent requests to be submitted to the blockchain", i18n.IntType)
ConfigHederaCompatibilityMode = ffc("config.connector.hederaCompatibilityMode", "Compatibility mode for Hedera, allowing non-standard block header hashes to be processed", i18n.BooleanType)
ConfigTraceTXForRevertReason = ffc("config.connector.traceTXForRevertReason", "Enable the use of transaction trace functions (e.g. debug_traceTransaction) to obtain transaction revert reasons. This can place a high load on the EVM client.", i18n.BooleanType)
_ = ffc("config.connector.url", "URL of JSON/RPC endpoint for the Ethereum node/gateway", "string")
_ = ffc("config.connector.ws.enabled", "When true a WebSocket is established for block listening, in addition to the HTTP RPC connections used for other functions", i18n.BooleanType)
_ = ffc("config.connector.dataFormat", "Configure the JSON data format for query output and events", "map,flat_array,self_describing")
_ = ffc("config.connector.gasEstimationFactor", "The factor to apply to the gas estimation to determine the gas limit", i18n.FloatType)
_ = ffc("config.connector.blockCacheSize", "Maximum of blocks to hold in the block info cache", i18n.IntType)
_ = ffc("config.connector.blockPollingInterval", "Interval for polling to check for new blocks", i18n.TimeDurationType)
_ = ffc("config.connector.queryLoopRetry.initialDelay", "Initial delay for retrying query requests to the RPC endpoint, applicable to all the query loops", i18n.TimeDurationType)
_ = ffc("config.connector.queryLoopRetry.factor", "Factor to increase the delay by, between each query request retry to the RPC endpoint, applicable to all the query loops", i18n.FloatType)
_ = ffc("config.connector.queryLoopRetry.maxDelay", "Maximum delay for between each query request retry to the RPC endpoint, applicable to all the query loops", i18n.TimeDurationType)
_ = ffc("config.connector.retry.initialDelay", "<Deprecated> Please refer to 'connector.queryLoopRetry.initialDelay' to understand its original purpose and use that instead", i18n.TimeDurationType)
_ = ffc("config.connector.retry.factor", "<Deprecated> Please refer to 'connector.queryLoopRetry.factor' to understand its original purpose and use that instead", i18n.FloatType)
_ = ffc("config.connector.retry.maxDelay", "<Deprecated> Please refer to 'connector.queryLoopRetry.maxDelay' to understand its original purpose and use that instead", i18n.TimeDurationType)
_ = ffc("config.connector.events.blockTimestamps", "Whether to include the block timestamps in the event information", i18n.BooleanType)
_ = ffc("config.connector.events.catchupPageSize", "Number of blocks to query per poll when catching up to the head of the blockchain", i18n.IntType)
_ = ffc("config.connector.events.catchupThreshold", "How many blocks behind the chain head an event stream or listener must be on startup, to enter catchup mode", i18n.IntType)
_ = ffc("config.connector.events.catchupDownscaleRegex", "An error pattern to check for from JSON/RPC providers if they limit response sizes to eth_getLogs(). If an error is returned from eth_getLogs() and that error matches the configured pattern, the number of logs requested (catchupPageSize) will be reduced automatically.", "string")
_ = ffc("config.connector.events.checkpointBlockGap", "The number of blocks at the head of the chain that should be considered unstable (could be dropped from the canonical chain after a re-org). Unless events with a full set of confirmations are detected, the restart checkpoint will this many blocks behind the chain head.", i18n.IntType)
_ = ffc("config.connector.events.filterPollingInterval", "The interval between polling calls to a filter, when checking for newly arrived events", i18n.TimeDurationType)
_ = ffc("config.connector.txCacheSize", "Maximum of transactions to hold in the transaction info cache", i18n.IntType)
_ = ffc("config.connector.maxConcurrentRequests", "Maximum of concurrent requests to be submitted to the blockchain", i18n.IntType)
_ = ffc("config.connector.hederaCompatibilityMode", "Compatibility mode for Hedera, allowing non-standard block header hashes to be processed", i18n.BooleanType)
_ = ffc("config.connector.traceTXForRevertReason", "Enable the use of transaction trace functions (e.g. debug_traceTransaction) to obtain transaction revert reasons. This can place a high load on the EVM client.", i18n.BooleanType)
)

0 comments on commit c80b0f3

Please sign in to comment.