From d3c22a6638d29bc03c16e14bd22bc26128269982 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 11 Jun 2024 17:12:39 -0400 Subject: [PATCH 1/2] WebSocket and subscription lifecycle Signed-off-by: Peter Broadhurst --- internal/ethereum/blocklistener.go | 62 ++++++++-- internal/ethereum/blocklistener_blockquery.go | 107 ++++++++++++++++++ internal/ethereum/blocklistener_test.go | 69 +++++++++++ internal/ethereum/config.go | 6 +- internal/ethereum/ethereum.go | 22 ++-- internal/ethereum/ethereum_test.go | 2 + internal/ethereum/event_listener.go | 2 +- internal/ethereum/get_block_info.go | 86 +------------- 8 files changed, 253 insertions(+), 103 deletions(-) create mode 100644 internal/ethereum/blocklistener_blockquery.go diff --git a/internal/ethereum/blocklistener.go b/internal/ethereum/blocklistener.go index 1facc7a..96696b1 100644 --- a/internal/ethereum/blocklistener.go +++ b/internal/ethereum/blocklistener.go @@ -22,10 +22,15 @@ import ( "sync" "time" + lru "github.com/hashicorp/golang-lru" "github.com/hyperledger/firefly-common/pkg/config" "github.com/hyperledger/firefly-common/pkg/fftypes" + "github.com/hyperledger/firefly-common/pkg/i18n" "github.com/hyperledger/firefly-common/pkg/log" + "github.com/hyperledger/firefly-common/pkg/wsclient" + "github.com/hyperledger/firefly-evmconnect/internal/msgs" "github.com/hyperledger/firefly-signer/pkg/ethtypes" + "github.com/hyperledger/firefly-signer/pkg/rpcbackend" "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" ) @@ -41,6 +46,8 @@ type blockUpdateConsumer struct { type blockListener struct { ctx context.Context c *ethConnector + backend rpcbackend.RPC + wsBackend rpcbackend.WebSocketRPCClient // if configured the getting the blockheight will not complete until WS connects, overrides backend once connected listenLoopDone chan struct{} initialBlockHeightObtained chan struct{} highestBlock int64 @@ -50,6 +57,7 @@ type blockListener struct { unstableHeadLength int canonicalChain *list.List hederaCompatibilityMode bool + blockCache *lru.Cache } type minimalBlockInfo struct { @@ -58,10 +66,11 @@ type minimalBlockInfo struct { parentHash string } -func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section) *blockListener { - bl := &blockListener{ +func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section, wsConf *wsclient.WSConfig) (bl *blockListener, err error) { + bl = &blockListener{ ctx: log.WithLogField(ctx, "role", "blocklistener"), c: c, + backend: c.backend, // use the HTTP backend - might get overwritten by a connected websocket later initialBlockHeightObtained: make(chan struct{}), highestBlock: -1, consumers: make(map[fftypes.UUID]*blockUpdateConsumer), @@ -70,14 +79,46 @@ func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section) unstableHeadLength: int(c.checkpointBlockGap), hederaCompatibilityMode: conf.GetBool(HederaCompatibilityMode), } - return bl + if wsConf != nil { + bl.wsBackend = rpcbackend.NewWSRPCClient(wsConf) + } + bl.blockCache, err = lru.New(conf.GetInt(BlockCacheSize)) + if err != nil { + return nil, i18n.WrapError(ctx, err, msgs.MsgCacheInitFail, "block") + } + return bl, nil } // getBlockHeightWithRetry keeps retrying attempting to get the initial block height until successful func (bl *blockListener) establishBlockHeightWithRetry() error { + wsConnected := false return bl.c.retry.Do(bl.ctx, "get initial block height", func(attempt int) (retry bool, err error) { + + // If we have a WebSocket backend, then we connect it and switch over to using it + // (we accept an un-locked update here to backend, as the most important routine that's + // querying block state is the one we're called on) + if bl.wsBackend != nil { + if !wsConnected { + if err := bl.wsBackend.Connect(bl.ctx); err != nil { + log.L(bl.ctx).Warnf("WebSocket connection failed, blocking startup of block listener: %s", err) + return true, err + } + // if we retry subscribe, we don't want to retry connect + wsConnected = true + } + // Once subscribed the backend will keep us subscribed over reconnect + if _, rpcErr := bl.wsBackend.Subscribe(bl.ctx, "newHeads"); rpcErr != nil { + return true, rpcErr.Error() + } + // Ok all JSON/RPC from this point on uses our WS Backend, thus ensuring we're + // sticky to the same node that the WS is connected to when we're doing queries + // and building our cache. + bl.backend = bl.wsBackend + } + + // Now get the block heiht var hexBlockHeight ethtypes.HexInteger - rpcErr := bl.c.backend.CallRPC(bl.ctx, &hexBlockHeight, "eth_blockNumber") + rpcErr := bl.backend.CallRPC(bl.ctx, &hexBlockHeight, "eth_blockNumber") if rpcErr != nil { log.L(bl.ctx).Warnf("Block height could not be obtained: %s", rpcErr.Message) return true, rpcErr.Error() @@ -118,7 +159,7 @@ func (bl *blockListener) listenLoop() { } if filter == "" { - err := bl.c.backend.CallRPC(bl.ctx, &filter, "eth_newBlockFilter") + err := bl.backend.CallRPC(bl.ctx, &filter, "eth_newBlockFilter") if err != nil { log.L(bl.ctx).Errorf("Failed to establish new block filter: %s", err.Message) failCount++ @@ -127,7 +168,7 @@ func (bl *blockListener) listenLoop() { } var blockHashes []ethtypes.HexBytes0xPrefix - rpcErr := bl.c.backend.CallRPC(bl.ctx, &blockHashes, "eth_getFilterChanges", filter) + rpcErr := bl.backend.CallRPC(bl.ctx, &blockHashes, "eth_getFilterChanges", filter) if rpcErr != nil { if mapError(filterRPCMethods, rpcErr.Error()) == ffcapi.ErrorReasonNotFound { log.L(bl.ctx).Warnf("Block filter '%v' no longer valid. Recreating filter: %s", filter, rpcErr.Message) @@ -159,7 +200,7 @@ func (bl *blockListener) listenLoop() { } // Do a lookup of the block (which will then go into our cache). - bi, err := bl.c.getBlockInfoByHash(bl.ctx, h.String()) + bi, err := bl.getBlockInfoByHash(bl.ctx, h.String()) switch { case err != nil: log.L(bl.ctx).Debugf("Failed to query block '%s': %s", h, err) @@ -312,7 +353,7 @@ func (bl *blockListener) rebuildCanonicalChain() *list.Element { var bi *blockInfoJSONRPC var reason ffcapi.ErrorReason err := bl.c.retry.Do(bl.ctx, "rebuild listener canonical chain", func(attempt int) (retry bool, err error) { - bi, reason, err = bl.c.getBlockInfoByNumber(bl.ctx, nextBlockNumber, false, "") + bi, reason, err = bl.getBlockInfoByNumber(bl.ctx, nextBlockNumber, false, "") return reason != ffcapi.ErrorReasonNotFound, err }) if err != nil { @@ -366,7 +407,7 @@ func (bl *blockListener) trimToLastValidBlock() (lastValidBlock *minimalBlockInf var freshBlockInfo *blockInfoJSONRPC var reason ffcapi.ErrorReason err := bl.c.retry.Do(bl.ctx, "rebuild listener canonical chain", func(attempt int) (retry bool, err error) { - freshBlockInfo, reason, err = bl.c.getBlockInfoByNumber(bl.ctx, currentViewBlock.number, false, "") + freshBlockInfo, reason, err = bl.getBlockInfoByNumber(bl.ctx, currentViewBlock.number, false, "") return reason != ffcapi.ErrorReasonNotFound, err }) if err != nil { @@ -447,6 +488,9 @@ func (bl *blockListener) waitClosed() { bl.mux.Lock() listenLoopDone := bl.listenLoopDone bl.mux.Unlock() + if bl.wsBackend != nil { + bl.wsBackend.Close() + } if listenLoopDone != nil { <-listenLoopDone } diff --git a/internal/ethereum/blocklistener_blockquery.go b/internal/ethereum/blocklistener_blockquery.go new file mode 100644 index 0000000..e3e6666 --- /dev/null +++ b/internal/ethereum/blocklistener_blockquery.go @@ -0,0 +1,107 @@ +// Copyright © 2024 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ethereum + +import ( + "context" + "strconv" + + "github.com/hyperledger/firefly-common/pkg/fftypes" + "github.com/hyperledger/firefly-common/pkg/i18n" + "github.com/hyperledger/firefly-common/pkg/log" + "github.com/hyperledger/firefly-evmconnect/internal/msgs" + "github.com/hyperledger/firefly-signer/pkg/ethtypes" + "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" +) + +// blockInfoJSONRPC are the info fields we parse from the JSON/RPC response, and cache +type blockInfoJSONRPC struct { + Number *ethtypes.HexInteger `json:"number"` + Hash ethtypes.HexBytes0xPrefix `json:"hash"` + ParentHash ethtypes.HexBytes0xPrefix `json:"parentHash"` + Timestamp *ethtypes.HexInteger `json:"timestamp"` + Transactions []ethtypes.HexBytes0xPrefix `json:"transactions"` +} + +func transformBlockInfo(bi *blockInfoJSONRPC, t *ffcapi.BlockInfo) { + t.BlockNumber = (*fftypes.FFBigInt)(bi.Number) + t.BlockHash = bi.Hash.String() + t.ParentHash = bi.ParentHash.String() + stringHashes := make([]string, len(bi.Transactions)) + for i, th := range bi.Transactions { + stringHashes[i] = th.String() + } + t.TransactionHashes = stringHashes +} + +func (bl *blockListener) addToBlockCache(blockInfo *blockInfoJSONRPC) { + bl.blockCache.Add(blockInfo.Hash.String(), blockInfo) + bl.blockCache.Add(blockInfo.Number.BigInt().String(), blockInfo) +} + +func (bl *blockListener) getBlockInfoByNumber(ctx context.Context, blockNumber int64, allowCache bool, expectedHashStr string) (*blockInfoJSONRPC, ffcapi.ErrorReason, error) { + var blockInfo *blockInfoJSONRPC + if allowCache { + cached, ok := bl.blockCache.Get(strconv.FormatInt(blockNumber, 10)) + if ok { + blockInfo = cached.(*blockInfoJSONRPC) + if expectedHashStr != "" && blockInfo.ParentHash.String() != expectedHashStr { + log.L(ctx).Debugf("Block cache miss for block %d due to mismatched parent hash expected=%s found=%s", blockNumber, expectedHashStr, blockInfo.ParentHash) + blockInfo = nil + } + } + } + + if blockInfo == nil { + rpcErr := bl.backend.CallRPC(ctx, &blockInfo, "eth_getBlockByNumber", ethtypes.NewHexInteger64(blockNumber), false /* only the txn hashes */) + if rpcErr != nil { + if mapError(blockRPCMethods, rpcErr.Error()) == ffcapi.ErrorReasonNotFound { + log.L(ctx).Debugf("Received error signifying 'block not found': '%s'", rpcErr.Message) + return nil, ffcapi.ErrorReasonNotFound, i18n.NewError(ctx, msgs.MsgBlockNotAvailable) + } + return nil, ffcapi.ErrorReason(""), rpcErr.Error() + } + if blockInfo == nil { + return nil, ffcapi.ErrorReason(""), nil + } + bl.addToBlockCache(blockInfo) + } + + return blockInfo, "", nil +} + +func (bl *blockListener) getBlockInfoByHash(ctx context.Context, hash0xString string) (*blockInfoJSONRPC, error) { + var blockInfo *blockInfoJSONRPC + cached, ok := bl.blockCache.Get(hash0xString) + if ok { + blockInfo = cached.(*blockInfoJSONRPC) + } + + if blockInfo == nil { + rpcErr := bl.backend.CallRPC(ctx, &blockInfo, "eth_getBlockByHash", hash0xString, false /* only the txn hashes */) + if rpcErr != nil || blockInfo == nil { + var err error + if rpcErr != nil { + err = rpcErr.Error() + } + return nil, err + } + bl.addToBlockCache(blockInfo) + } + + return blockInfo, nil +} diff --git a/internal/ethereum/blocklistener_test.go b/internal/ethereum/blocklistener_test.go index 8f1be9f..431ed97 100644 --- a/internal/ethereum/blocklistener_test.go +++ b/internal/ethereum/blocklistener_test.go @@ -18,10 +18,14 @@ package ethereum import ( "context" + "encoding/json" + "fmt" + "net/http" "testing" "time" "github.com/hyperledger/firefly-common/pkg/fftypes" + "github.com/hyperledger/firefly-common/pkg/wsclient" "github.com/hyperledger/firefly-signer/pkg/ethtypes" "github.com/hyperledger/firefly-signer/pkg/rpcbackend" "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" @@ -52,6 +56,71 @@ func TestBlockListenerStartGettingHighestBlockRetry(t *testing.T) { } +func TestBlockListenerStartGettingHighestBlockRetryWS(t *testing.T) { + + failedConnectOnce := false + failedSubOnce := false + toServer, fromServer, url, wsDone := wsclient.NewTestWSServer(func(req *http.Request) { + if !failedConnectOnce { + failedConnectOnce = true + panic("fail once here") + } + }) + + ctx, c, _, done := newTestConnector(t) + svrDone := make(chan struct{}) + defer func() { + wsDone() + done() + <-svrDone + }() + bl := c.blockListener + bl.wsBackend = rpcbackend.NewWSRPCClient(&wsclient.WSConfig{ + HTTPURL: url, // ensured to fail + }) + + go func() { + defer close(svrDone) + for { + select { + case rpcStr := <-toServer: + var rpcReq rpcbackend.RPCRequest + err := json.Unmarshal([]byte(rpcStr), &rpcReq) + assert.NoError(t, err) + rpcRes := &rpcbackend.RPCResponse{ + JSONRpc: rpcReq.JSONRpc, + ID: rpcReq.ID, + } + if rpcReq.Method == "eth_blockNumber" { + rpcRes.Result = fftypes.JSONAnyPtr(`"0x12345"`) + } else { + assert.Equal(t, "newHeads", rpcReq.Params[0].AsString()) + if !failedSubOnce { + failedSubOnce = true + rpcRes.Error = &rpcbackend.RPCError{ + Code: int64(rpcbackend.RPCCodeInternalError), + Message: "pop", + } + } else { + rpcRes.Result = fftypes.JSONAnyPtr(fmt.Sprintf(`"%s"`, fftypes.NewUUID())) + } + } + b, err := json.Marshal(rpcRes) + assert.NoError(t, err) + fromServer <- string(b) + case <-ctx.Done(): + return + } + } + }() + + err := bl.establishBlockHeightWithRetry() + assert.NoError(t, err) + assert.True(t, failedConnectOnce) + assert.True(t, failedSubOnce) + +} + func TestBlockListenerStartGettingHighestBlockFailBeforeStop(t *testing.T) { _, c, mRPC, done := newTestConnector(t) diff --git a/internal/ethereum/config.go b/internal/ethereum/config.go index 30fdd9e..0cc1c09 100644 --- a/internal/ethereum/config.go +++ b/internal/ethereum/config.go @@ -18,7 +18,7 @@ package ethereum import ( "github.com/hyperledger/firefly-common/pkg/config" - "github.com/hyperledger/firefly-common/pkg/ffresty" + "github.com/hyperledger/firefly-common/pkg/wsclient" ) const ( @@ -39,6 +39,7 @@ const ( TxCacheSize = "txCacheSize" HederaCompatibilityMode = "hederaCompatibilityMode" TraceTXForRevertReason = "traceTXForRevertReason" + WebSocketsEnabled = "ws.enabled" ) const ( @@ -56,7 +57,8 @@ const ( ) func InitConfig(conf config.Section) { - ffresty.InitConfig(conf) + wsclient.InitConfig(conf) + conf.AddKnownKey(WebSocketsEnabled, false) conf.AddKnownKey(BlockCacheSize, 250) conf.AddKnownKey(BlockPollingInterval, "1s") conf.AddKnownKey(ConfigDataFormat, "map") diff --git a/internal/ethereum/ethereum.go b/internal/ethereum/ethereum.go index 7c49d8a..c2b5ee9 100644 --- a/internal/ethereum/ethereum.go +++ b/internal/ethereum/ethereum.go @@ -31,6 +31,7 @@ import ( "github.com/hyperledger/firefly-common/pkg/i18n" "github.com/hyperledger/firefly-common/pkg/log" "github.com/hyperledger/firefly-common/pkg/retry" + "github.com/hyperledger/firefly-common/pkg/wsclient" "github.com/hyperledger/firefly-evmconnect/internal/msgs" "github.com/hyperledger/firefly-signer/pkg/abi" "github.com/hyperledger/firefly-signer/pkg/rpcbackend" @@ -53,7 +54,6 @@ type ethConnector struct { mux sync.Mutex eventStreams map[fftypes.UUID]*eventStream - blockCache *lru.Cache txCache *lru.Cache } @@ -76,10 +76,6 @@ func NewEthereumConnector(ctx context.Context, conf config.Section) (cc ffcapi.A log.L(ctx).Warnf("Catchup threshold %d must be at least as large as the catchup page size %d (overridden to %d)", c.catchupThreshold, c.catchupPageSize, c.catchupPageSize) c.catchupThreshold = c.catchupPageSize } - c.blockCache, err = lru.New(conf.GetInt(BlockCacheSize)) - if err != nil { - return nil, i18n.WrapError(ctx, err, msgs.MsgCacheInitFail, "block") - } c.txCache, err = lru.New(conf.GetInt(TxCacheSize)) if err != nil { @@ -96,10 +92,20 @@ func NewEthereumConnector(ctx context.Context, conf config.Section) (cc ffcapi.A return nil, i18n.WrapError(ctx, err, msgs.MsgInvalidRegex, c.catchupDownscaleRegex) } - httpClient, err := ffresty.New(ctx, conf) + var wsConf *wsclient.WSConfig + var httpConf *ffresty.Config + if conf.GetBool(WebSocketsEnabled) { + // If websockets are enabled, then they are used selectively (block listening/query) + // not as a full replacement for HTTP. + wsConf, err = wsclient.GenerateConfig(ctx, conf) + } + if err == nil { + httpConf, err = ffresty.GenerateConfig(ctx, conf) + } if err != nil { return nil, err } + httpClient := ffresty.NewWithConfig(ctx, *httpConf) c.backend = rpcbackend.NewRPCClientWithOption(httpClient, rpcbackend.RPCClientOptions{ MaxConcurrentRequest: conf.GetInt64(MaxConcurrentRequests), }) @@ -123,7 +129,9 @@ func NewEthereumConnector(ctx context.Context, conf config.Section) (cc ffcapi.A return name }) - c.blockListener = newBlockListener(ctx, c, conf) + if c.blockListener, err = newBlockListener(ctx, c, conf, wsConf); err != nil { + return nil, err + } return c, nil } diff --git a/internal/ethereum/ethereum_test.go b/internal/ethereum/ethereum_test.go index 1f19ecf..013cc5a 100644 --- a/internal/ethereum/ethereum_test.go +++ b/internal/ethereum/ethereum_test.go @@ -46,6 +46,7 @@ func newTestConnector(t *testing.T) (context.Context, *ethConnector, *rpcbackend assert.NoError(t, err) c := cc.(*ethConnector) c.backend = mRPC + c.blockListener.backend = mRPC return ctx, c, mRPC, func() { done() mRPC.AssertExpectations(t) @@ -64,6 +65,7 @@ func TestConnectorInit(t *testing.T) { assert.Regexp(t, "FF23025", err) conf.Set(ffresty.HTTPConfigURL, "http://localhost:8545") + conf.Set(WebSocketsEnabled, true) conf.Set(EventsCatchupThreshold, 1) conf.Set(EventsCatchupPageSize, 500) conf.Set(EventsCatchupDownscaleRegex, "Response size is larger.*error.") diff --git a/internal/ethereum/event_listener.go b/internal/ethereum/event_listener.go index 4e03f16..d16aa7d 100644 --- a/internal/ethereum/event_listener.go +++ b/internal/ethereum/event_listener.go @@ -311,7 +311,7 @@ func (l *listener) filterEnrichEthLog(ctx context.Context, f *eventFilter, ethLo var timestamp *fftypes.FFTime if l.c.eventBlockTimestamps { - bi, err := l.c.getBlockInfoByHash(ctx, ethLog.BlockHash.String()) + bi, err := l.c.blockListener.getBlockInfoByHash(ctx, ethLog.BlockHash.String()) if bi == nil || err != nil { log.L(ctx).Errorf("Failed to get block info timestamp for block '%s': %v", ethLog.BlockHash, err) return nil, false, err // This is an error condition, rather than just something we cannot enrich diff --git a/internal/ethereum/get_block_info.go b/internal/ethereum/get_block_info.go index a7de4d6..d33015e 100644 --- a/internal/ethereum/get_block_info.go +++ b/internal/ethereum/get_block_info.go @@ -18,75 +18,15 @@ package ethereum import ( "context" - "strconv" - "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly-common/pkg/i18n" - "github.com/hyperledger/firefly-common/pkg/log" "github.com/hyperledger/firefly-evmconnect/internal/msgs" - "github.com/hyperledger/firefly-signer/pkg/ethtypes" "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" ) -// blockInfoJSONRPC are the info fields we parse from the JSON/RPC response, and cache -type blockInfoJSONRPC struct { - Number *ethtypes.HexInteger `json:"number"` - Hash ethtypes.HexBytes0xPrefix `json:"hash"` - ParentHash ethtypes.HexBytes0xPrefix `json:"parentHash"` - Timestamp *ethtypes.HexInteger `json:"timestamp"` - Transactions []ethtypes.HexBytes0xPrefix `json:"transactions"` -} - -func transformBlockInfo(bi *blockInfoJSONRPC, t *ffcapi.BlockInfo) { - t.BlockNumber = (*fftypes.FFBigInt)(bi.Number) - t.BlockHash = bi.Hash.String() - t.ParentHash = bi.ParentHash.String() - stringHashes := make([]string, len(bi.Transactions)) - for i, th := range bi.Transactions { - stringHashes[i] = th.String() - } - t.TransactionHashes = stringHashes -} - -func (c *ethConnector) addToBlockCache(blockInfo *blockInfoJSONRPC) { - c.blockCache.Add(blockInfo.Hash.String(), blockInfo) - c.blockCache.Add(blockInfo.Number.BigInt().String(), blockInfo) -} - -func (c *ethConnector) getBlockInfoByNumber(ctx context.Context, blockNumber int64, allowCache bool, expectedHashStr string) (*blockInfoJSONRPC, ffcapi.ErrorReason, error) { - var blockInfo *blockInfoJSONRPC - if allowCache { - cached, ok := c.blockCache.Get(strconv.FormatInt(blockNumber, 10)) - if ok { - blockInfo = cached.(*blockInfoJSONRPC) - if expectedHashStr != "" && blockInfo.ParentHash.String() != expectedHashStr { - log.L(ctx).Debugf("Block cache miss for block %d due to mismatched parent hash expected=%s found=%s", blockNumber, expectedHashStr, blockInfo.ParentHash) - blockInfo = nil - } - } - } - - if blockInfo == nil { - rpcErr := c.backend.CallRPC(ctx, &blockInfo, "eth_getBlockByNumber", ethtypes.NewHexInteger64(blockNumber), false /* only the txn hashes */) - if rpcErr != nil { - if mapError(blockRPCMethods, rpcErr.Error()) == ffcapi.ErrorReasonNotFound { - log.L(ctx).Debugf("Received error signifying 'block not found': '%s'", rpcErr.Message) - return nil, ffcapi.ErrorReasonNotFound, i18n.NewError(ctx, msgs.MsgBlockNotAvailable) - } - return nil, ffcapi.ErrorReason(""), rpcErr.Error() - } - if blockInfo == nil { - return nil, ffcapi.ErrorReason(""), nil - } - c.addToBlockCache(blockInfo) - } - - return blockInfo, "", nil -} - func (c *ethConnector) BlockInfoByNumber(ctx context.Context, req *ffcapi.BlockInfoByNumberRequest) (*ffcapi.BlockInfoByNumberResponse, ffcapi.ErrorReason, error) { - blockInfo, reason, err := c.getBlockInfoByNumber(ctx, req.BlockNumber.Int64(), true, req.ExpectedParentHash) + blockInfo, reason, err := c.blockListener.getBlockInfoByNumber(ctx, req.BlockNumber.Int64(), true, req.ExpectedParentHash) if err != nil { return nil, reason, err } @@ -99,31 +39,9 @@ func (c *ethConnector) BlockInfoByNumber(ctx context.Context, req *ffcapi.BlockI return res, "", nil } -func (c *ethConnector) getBlockInfoByHash(ctx context.Context, hash0xString string) (*blockInfoJSONRPC, error) { - var blockInfo *blockInfoJSONRPC - cached, ok := c.blockCache.Get(hash0xString) - if ok { - blockInfo = cached.(*blockInfoJSONRPC) - } - - if blockInfo == nil { - rpcErr := c.backend.CallRPC(ctx, &blockInfo, "eth_getBlockByHash", hash0xString, false /* only the txn hashes */) - if rpcErr != nil || blockInfo == nil { - var err error - if rpcErr != nil { - err = rpcErr.Error() - } - return nil, err - } - c.addToBlockCache(blockInfo) - } - - return blockInfo, nil -} - func (c *ethConnector) BlockInfoByHash(ctx context.Context, req *ffcapi.BlockInfoByHashRequest) (*ffcapi.BlockInfoByHashResponse, ffcapi.ErrorReason, error) { - blockInfo, err := c.getBlockInfoByHash(ctx, req.BlockHash) + blockInfo, err := c.blockListener.getBlockInfoByHash(ctx, req.BlockHash) if err != nil { return nil, ffcapi.ErrorReason(""), err } From 172cc404c68837bcc397a6128b83f6046044a46b Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 11 Jun 2024 21:07:17 -0400 Subject: [PATCH 2/2] Faster block detection using WebSocket notifications Signed-off-by: Peter Broadhurst --- config.md | 13 ++ internal/ethereum/blocklistener.go | 29 ++++- internal/ethereum/blocklistener_test.go | 157 ++++++++++++++---------- internal/ethereum/ethereum_test.go | 5 +- internal/msgs/en_config_descriptions.go | 1 + 5 files changed, 135 insertions(+), 70 deletions(-) diff --git a/config.md b/config.md index a324b65..cca5172 100644 --- a/config.md +++ b/config.md @@ -142,6 +142,19 @@ |keyFile|The path to the private key file for TLS on this API|`string`|`` |requiredDNAttributes|A set of required subject DN attributes. Each entry is a regular expression, and the subject certificate must have a matching attribute of the specified type (CN, C, O, OU, ST, L, STREET, POSTALCODE, SERIALNUMBER are valid attributes)|`map[string]string`|`` +## connector.ws + +|Key|Description|Type|Default Value| +|---|-----------|----|-------------| +|connectionTimeout|The amount of time to wait while establishing a connection (or auto-reconnection)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`45s` +|enabled|When true a WebSocket is established for block listening, in addition to the HTTP RPC connections used for other functions|`boolean`|`false` +|heartbeatInterval|The amount of time to wait between heartbeat signals on the WebSocket connection|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s` +|initialConnectAttempts|The number of attempts FireFly will make to connect to the WebSocket when starting up, before failing|`int`|`5` +|path|The WebSocket sever URL to which FireFly should connect|WebSocket URL `string`|`` +|readBufferSize|The size in bytes of the read buffer for the WebSocket connection|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`16Kb` +|url|URL to use for WebSocket - overrides url one level up (in the HTTP config)|`string`|`` +|writeBufferSize|The size in bytes of the write buffer for the WebSocket connection|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`16Kb` + ## cors |Key|Description|Type|Default Value| diff --git a/internal/ethereum/blocklistener.go b/internal/ethereum/blocklistener.go index 96696b1..4fdecaa 100644 --- a/internal/ethereum/blocklistener.go +++ b/internal/ethereum/blocklistener.go @@ -50,6 +50,8 @@ type blockListener struct { wsBackend rpcbackend.WebSocketRPCClient // if configured the getting the blockheight will not complete until WS connects, overrides backend once connected listenLoopDone chan struct{} initialBlockHeightObtained chan struct{} + newHeadsTap chan struct{} + newHeadsSub rpcbackend.Subscription highestBlock int64 mux sync.Mutex consumers map[fftypes.UUID]*blockUpdateConsumer @@ -72,6 +74,7 @@ func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section, c: c, backend: c.backend, // use the HTTP backend - might get overwritten by a connected websocket later initialBlockHeightObtained: make(chan struct{}), + newHeadsTap: make(chan struct{}), highestBlock: -1, consumers: make(map[fftypes.UUID]*blockUpdateConsumer), blockPollingInterval: conf.GetDuration(BlockPollingInterval), @@ -89,6 +92,17 @@ func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section, return bl, nil } +func (bl *blockListener) newHeadsSubListener() { + for range bl.newHeadsSub.Notifications() { + select { + case bl.newHeadsTap <- struct{}{}: + // Do nothing apart from tap the listener to wake up early + // when there's a notification to the change of the head. + default: + } + } +} + // getBlockHeightWithRetry keeps retrying attempting to get the initial block height until successful func (bl *blockListener) establishBlockHeightWithRetry() error { wsConnected := false @@ -106,9 +120,14 @@ func (bl *blockListener) establishBlockHeightWithRetry() error { // if we retry subscribe, we don't want to retry connect wsConnected = true } - // Once subscribed the backend will keep us subscribed over reconnect - if _, rpcErr := bl.wsBackend.Subscribe(bl.ctx, "newHeads"); rpcErr != nil { - return true, rpcErr.Error() + if bl.newHeadsSub == nil { + // Once subscribed the backend will keep us subscribed over reconnect + sub, rpcErr := bl.wsBackend.Subscribe(bl.ctx, "newHeads") + if rpcErr != nil { + return true, rpcErr.Error() + } + bl.newHeadsSub = sub + go bl.newHeadsSubListener() } // Ok all JSON/RPC from this point on uses our WS Backend, thus ensuring we're // sticky to the same node that the WS is connected to when we're doing queries @@ -149,9 +168,10 @@ func (bl *blockListener) listenLoop() { return } } else { - // Sleep for the polling interval + // Sleep for the polling interval, or until we're shoulder tapped by the newHeads listener select { case <-time.After(bl.blockPollingInterval): + case <-bl.newHeadsTap: case <-bl.ctx.Done(): log.L(bl.ctx).Debugf("Block listener loop stopping") return @@ -489,6 +509,7 @@ func (bl *blockListener) waitClosed() { listenLoopDone := bl.listenLoopDone bl.mux.Unlock() if bl.wsBackend != nil { + _ = bl.wsBackend.UnsubscribeAll(bl.ctx) bl.wsBackend.Close() } if listenLoopDone != nil { diff --git a/internal/ethereum/blocklistener_test.go b/internal/ethereum/blocklistener_test.go index 431ed97..261d9be 100644 --- a/internal/ethereum/blocklistener_test.go +++ b/internal/ethereum/blocklistener_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/hyperledger/firefly-common/pkg/config" "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly-common/pkg/wsclient" "github.com/hyperledger/firefly-signer/pkg/ethtypes" @@ -56,71 +57,6 @@ func TestBlockListenerStartGettingHighestBlockRetry(t *testing.T) { } -func TestBlockListenerStartGettingHighestBlockRetryWS(t *testing.T) { - - failedConnectOnce := false - failedSubOnce := false - toServer, fromServer, url, wsDone := wsclient.NewTestWSServer(func(req *http.Request) { - if !failedConnectOnce { - failedConnectOnce = true - panic("fail once here") - } - }) - - ctx, c, _, done := newTestConnector(t) - svrDone := make(chan struct{}) - defer func() { - wsDone() - done() - <-svrDone - }() - bl := c.blockListener - bl.wsBackend = rpcbackend.NewWSRPCClient(&wsclient.WSConfig{ - HTTPURL: url, // ensured to fail - }) - - go func() { - defer close(svrDone) - for { - select { - case rpcStr := <-toServer: - var rpcReq rpcbackend.RPCRequest - err := json.Unmarshal([]byte(rpcStr), &rpcReq) - assert.NoError(t, err) - rpcRes := &rpcbackend.RPCResponse{ - JSONRpc: rpcReq.JSONRpc, - ID: rpcReq.ID, - } - if rpcReq.Method == "eth_blockNumber" { - rpcRes.Result = fftypes.JSONAnyPtr(`"0x12345"`) - } else { - assert.Equal(t, "newHeads", rpcReq.Params[0].AsString()) - if !failedSubOnce { - failedSubOnce = true - rpcRes.Error = &rpcbackend.RPCError{ - Code: int64(rpcbackend.RPCCodeInternalError), - Message: "pop", - } - } else { - rpcRes.Result = fftypes.JSONAnyPtr(fmt.Sprintf(`"%s"`, fftypes.NewUUID())) - } - } - b, err := json.Marshal(rpcRes) - assert.NoError(t, err) - fromServer <- string(b) - case <-ctx.Done(): - return - } - } - }() - - err := bl.establishBlockHeightWithRetry() - assert.NoError(t, err) - assert.True(t, failedConnectOnce) - assert.True(t, failedSubOnce) - -} - func TestBlockListenerStartGettingHighestBlockFailBeforeStop(t *testing.T) { _, c, mRPC, done := newTestConnector(t) @@ -232,6 +168,97 @@ func TestBlockListenerOKSequential(t *testing.T) { } +func TestBlockListenerWSShoulderTap(t *testing.T) { + + failedConnectOnce := false + failedSubOnce := false + toServer, fromServer, url, wsDone := wsclient.NewTestWSServer(func(req *http.Request) { + if !failedConnectOnce { + failedConnectOnce = true + panic("fail once here") + } + }) + + ctx, c, _, done := newTestConnector(t, func(conf config.Section) { + conf.Set(wsclient.WSConfigURL, url) + conf.Set(wsclient.WSConfigKeyInitialConnectAttempts, 0) + conf.Set(WebSocketsEnabled, true) + conf.Set(BlockPollingInterval, "100s") // so the test would just hang if no WS notifications + }) + svrDone := make(chan struct{}) + bl := c.blockListener + + pingerDone := make(chan struct{}) + complete := false + go func() { + defer close(svrDone) + for { + select { + case rpcStr := <-toServer: + var rpcReq rpcbackend.RPCRequest + err := json.Unmarshal([]byte(rpcStr), &rpcReq) + assert.NoError(t, err) + rpcRes := &rpcbackend.RPCResponse{ + JSONRpc: rpcReq.JSONRpc, + ID: rpcReq.ID, + } + switch rpcReq.Method { + case "eth_blockNumber": + rpcRes.Result = fftypes.JSONAnyPtr(`"0x12345"`) + case "eth_subscribe": + assert.Equal(t, "newHeads", rpcReq.Params[0].AsString()) + if !failedSubOnce { + failedSubOnce = true + rpcRes.Error = &rpcbackend.RPCError{ + Code: int64(rpcbackend.RPCCodeInternalError), + Message: "pop", + } + } else { + rpcRes.Result = fftypes.JSONAnyPtr(fmt.Sprintf(`"%s"`, fftypes.NewUUID())) + // Spam with notifications + go func() { + defer close(pingerDone) + for !complete { + time.Sleep(100 * time.Microsecond) + if bl.newHeadsSub != nil { + bl.newHeadsSub.Notifications() <- &rpcbackend.RPCSubscriptionNotification{ + CurrentSubID: bl.newHeadsSub.LocalID().String(), + Result: fftypes.JSONAnyPtr(`"anything"`), + } + } + } + }() + } + case "eth_newBlockFilter": + rpcRes.Result = fftypes.JSONAnyPtr(fmt.Sprintf(`"%s"`, fftypes.NewUUID())) + case "eth_getFilterChanges": + // ok we can close - the shoulder tap worked + complete = true + <-pingerDone + go done() + default: + assert.Fail(t, "unexpected RPC call: %+v", rpcReq) + } + b, err := json.Marshal(rpcRes) + assert.NoError(t, err) + fromServer <- string(b) + case <-ctx.Done(): + return + } + } + }() + + bl.checkStartedLocked() + + // Wait until we close because it worked + <-bl.listenLoopDone + assert.True(t, failedConnectOnce) + assert.True(t, failedSubOnce) + + wsDone() + <-svrDone +} + func TestBlockListenerOKDuplicates(t *testing.T) { _, c, mRPC, done := newTestConnector(t) diff --git a/internal/ethereum/ethereum_test.go b/internal/ethereum/ethereum_test.go index 013cc5a..15d2066 100644 --- a/internal/ethereum/ethereum_test.go +++ b/internal/ethereum/ethereum_test.go @@ -31,7 +31,7 @@ import ( func strPtr(s string) *string { return &s } -func newTestConnector(t *testing.T) (context.Context, *ethConnector, *rpcbackendmocks.Backend, func()) { +func newTestConnector(t *testing.T, confSetup ...func(conf config.Section)) (context.Context, *ethConnector, *rpcbackendmocks.Backend, func()) { mRPC := &rpcbackendmocks.Backend{} config.RootConfigReset() @@ -41,6 +41,9 @@ func newTestConnector(t *testing.T) (context.Context, *ethConnector, *rpcbackend conf.Set(ffresty.HTTPConfigURL, "http://localhost:8545") conf.Set(BlockPollingInterval, "1h") // Disable for tests that are not using it logrus.SetLevel(logrus.DebugLevel) + for _, fn := range confSetup { + fn(conf) + } ctx, done := context.WithCancel(context.Background()) cc, err := NewEthereumConnector(ctx, conf) assert.NoError(t, err) diff --git a/internal/msgs/en_config_descriptions.go b/internal/msgs/en_config_descriptions.go index fc5e664..aba6020 100644 --- a/internal/msgs/en_config_descriptions.go +++ b/internal/msgs/en_config_descriptions.go @@ -28,6 +28,7 @@ var ffc = func(key, translation string, fieldType string) i18n.ConfigMessageKey //revive:disable var ( ConfigEthereumURL = ffc("config.connector.url", "URL of JSON/RPC endpoint for the Ethereum node/gateway", "string") + ConfigEthereumWSEnabled = ffc("config.connector.ws.enabled", "When true a WebSocket is established for block listening, in addition to the HTTP RPC connections used for other functions", i18n.BooleanType) ConfigEthereumDataFormat = ffc("config.connector.dataFormat", "Configure the JSON data format for query output and events", "map,flat_array,self_describing") ConfigEthereumGasEstimationFactor = ffc("config.connector.gasEstimationFactor", "The factor to apply to the gas estimation to determine the gas limit", "float") ConfigBlockCacheSize = ffc("config.connector.blockCacheSize", "Maximum of blocks to hold in the block info cache", i18n.IntType)