From dda04ec8a81232bad7777f32178215bc9e0f33f9 Mon Sep 17 00:00:00 2001 From: 0x6e616d Date: Sat, 10 Aug 2024 18:09:01 +0700 Subject: [PATCH 1/8] feat: add unit tests for the event listener service and block keeper --- internal/pkg/listener/service_test.go | 102 ++++++++++++++++++ internal/pkg/repository/block_keeper_test.go | 93 ++++++++++++++++ .../pkg/testutil/sync_block_inmem_keeper.go | 16 +++ 3 files changed, 211 insertions(+) create mode 100644 internal/pkg/listener/service_test.go create mode 100644 internal/pkg/repository/block_keeper_test.go create mode 100644 internal/pkg/testutil/sync_block_inmem_keeper.go diff --git a/internal/pkg/listener/service_test.go b/internal/pkg/listener/service_test.go new file mode 100644 index 0000000..af1f6ca --- /dev/null +++ b/internal/pkg/listener/service_test.go @@ -0,0 +1,102 @@ +package listener + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/bcclient" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/constant" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/repository" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/testutil" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/types" + "golang.org/x/sync/errgroup" +) + +const ( + rpcUrl = "ws://sepolia.rpc.tokamak.network:8546" +) + +func Test_syncOldBlocks(t *testing.T) { + const ( + totalOldBlocks = uint64(5) + ) + ctx := context.Background() + + bcClient, err := bcclient.New(ctx, rpcUrl) + require.NoError(t, err) + + syncBlockKeeper := &testutil.SyncBlockInMemKeeper{} + keeper, err := repository.NewBlockKeeper(ctx, bcClient, syncBlockKeeper) + require.NoError(t, err) + + listenerSrv, err := MakeService("test-event-listener", bcClient, keeper) + require.NoError(t, err) + + currentHead, err := bcClient.BlockNumber(ctx) + require.NoError(t, err) + + oldBlockNumber := currentHead - totalOldBlocks + consumingBlock, err := bcClient.HeaderAtBlockNumber(ctx, oldBlockNumber) + require.NoError(t, err) + + err = keeper.SetHead(ctx, consumingBlock, constant.ZeroHash) + require.NoError(t, err) + oldBlocksCh := make(chan *types.NewBlock) + + g, _ := errgroup.WithContext(ctx) + g.Go(func() error { + err = listenerSrv.syncOldBlocks(ctx, oldBlocksCh) + defer close(oldBlocksCh) + return err + }) + + i := uint64(0) + blockNo := oldBlockNumber + 1 + for block := range oldBlocksCh { + currentBlockNumber := block.Header.Number.Uint64() + assert.Equal(t, currentBlockNumber, blockNo) + + t.Logf(`Got old block: %d`, block.Header.Number.Uint64()) + i++ + blockNo++ + } + assert.Equal(t, i, totalOldBlocks) + + require.NoError(t, g.Wait()) +} + +func Test_handleReorgBlock(t *testing.T) { + ctx := context.Background() + + bcClient, err := bcclient.New(ctx, rpcUrl) + require.NoError(t, err) + + syncBlockKeeper := &testutil.SyncBlockInMemKeeper{} + keeper, err := repository.NewBlockKeeper(ctx, bcClient, syncBlockKeeper) + require.NoError(t, err) + + listenerSrv, err := MakeService("test-event-listener", bcClient, keeper) + require.NoError(t, err) + block, err := bcClient.GetHeader(ctx) + require.NoError(t, err) + + blocks, err := listenerSrv.handleReorgBlocks(ctx, block) + require.NoError(t, err) + + assert.Len(t, blocks, 0) + + // pass through two blocks + time.Sleep(24 * time.Second) + + block, err = bcClient.GetHeader(ctx) + require.NoError(t, err) + + blocks, err = listenerSrv.handleReorgBlocks(ctx, block) + require.NoError(t, err) + + assert.Equal(t, true, len(blocks) > 0) + +} diff --git a/internal/pkg/repository/block_keeper_test.go b/internal/pkg/repository/block_keeper_test.go new file mode 100644 index 0000000..a32a9b2 --- /dev/null +++ b/internal/pkg/repository/block_keeper_test.go @@ -0,0 +1,93 @@ +package repository + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/bcclient" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/testutil" +) + +const ( + rpcUrl = "ws://sepolia.rpc.tokamak.network:8546" +) + +func TestBlockKeeper_initWithExistingBlockHash(t *testing.T) { + ctx := context.Background() + + syncBlockKeeper := &testutil.SyncBlockInMemKeeper{} + + bcClient, err := bcclient.New(ctx, rpcUrl) + require.NoError(t, err) + + blockNo, err := bcClient.BlockNumber(ctx) + require.NoError(t, err) + + block, err := bcClient.HeaderAtBlockNumber(ctx, blockNo) + require.NoError(t, err) + + // set the consuming block hash + err = syncBlockKeeper.SetHead(ctx, block.Hash().String()) + require.NoError(t, err) + + blockKeeper, err := NewBlockKeeper(ctx, bcClient, syncBlockKeeper) + require.NoError(t, err) + + assert.Equal(t, TwoEpochBlocks, blockKeeper.q.Size()) + assert.Equal(t, TwoEpochBlocks, len(blockKeeper.blocks)) + assert.Equal(t, block.Hash(), blockKeeper.head.Hash()) +} + +func TestBlockKeeper_initWithoutExistingBlockHash(t *testing.T) { + ctx := context.Background() + + syncBlockKeeper := &testutil.SyncBlockInMemKeeper{} + + bcClient, err := bcclient.New(ctx, rpcUrl) + require.NoError(t, err) + + currentBlock, err := bcClient.GetHeader(ctx) + require.NoError(t, err) + + blockKeeper, err := NewBlockKeeper(ctx, bcClient, syncBlockKeeper) + require.NoError(t, err) + + assert.Equal(t, TwoEpochBlocks, blockKeeper.q.Size()) + assert.Equal(t, TwoEpochBlocks, len(blockKeeper.blocks)) + assert.Equal(t, currentBlock.Hash(), blockKeeper.head.Hash()) +} + +func TestBlockKeeper_getReorgBlocks(t *testing.T) { + ctx := context.Background() + + syncBlockKeeper := &testutil.SyncBlockInMemKeeper{} + + bcClient, err := bcclient.New(ctx, rpcUrl) + require.NoError(t, err) + + blockNo, err := bcClient.BlockNumber(ctx) + require.NoError(t, err) + + block, err := bcClient.HeaderAtBlockNumber(ctx, blockNo-5) + require.NoError(t, err) + + currentBlock, err := bcClient.GetHeader(ctx) + require.NoError(t, err) + + // set the consuming block hash + err = syncBlockKeeper.SetHead(ctx, block.Hash().String()) + require.NoError(t, err) + + blockKeeper, err := NewBlockKeeper(ctx, bcClient, syncBlockKeeper) + require.NoError(t, err) + + assert.Equal(t, TwoEpochBlocks, blockKeeper.q.Size()) + assert.Equal(t, TwoEpochBlocks, len(blockKeeper.blocks)) + + reorgedBlocks, _, err := blockKeeper.GetReorgHeaders(ctx, currentBlock) + require.NoError(t, err) + + assert.Equal(t, 4, len(reorgedBlocks)) +} diff --git a/internal/pkg/testutil/sync_block_inmem_keeper.go b/internal/pkg/testutil/sync_block_inmem_keeper.go new file mode 100644 index 0000000..1784546 --- /dev/null +++ b/internal/pkg/testutil/sync_block_inmem_keeper.go @@ -0,0 +1,16 @@ +package testutil + +import "context" + +type SyncBlockInMemKeeper struct { + head string +} + +func (k *SyncBlockInMemKeeper) GetHead(ctx context.Context) (string, error) { + return k.head, nil +} + +func (k *SyncBlockInMemKeeper) SetHead(ctx context.Context, head string) error { + k.head = head + return nil +} From 3cd1b8d73c39498714d7424d4f7e6e5ff1b51b5b Mon Sep 17 00:00:00 2001 From: 0x6e616d Date: Sat, 10 Aug 2024 21:30:41 +0700 Subject: [PATCH 2/8] feat: add time.Sleep when subscribing failed and update backOff duration --- internal/pkg/listener/service.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/pkg/listener/service.go b/internal/pkg/listener/service.go index 2146221..a1d5866 100644 --- a/internal/pkg/listener/service.go +++ b/internal/pkg/listener/service.go @@ -6,6 +6,7 @@ import ( "encoding/gob" "fmt" "math" + "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" @@ -139,9 +140,10 @@ func (s *EventService) Start(ctx context.Context) error { return err } - s.sub = event.ResubscribeErr(10, func(ctx context.Context, err error) (event.Subscription, error) { + s.sub = event.ResubscribeErr(5*time.Second, func(ctx context.Context, err error) (event.Subscription, error) { if err != nil { s.l.Errorw("Failed to re-subscribe the event", "err", err) + time.Sleep(1 * time.Second) } return s.subscribeNewHead(ctx) From ea29ff4a9b674e794083aaeafbd71402468f9294 Mon Sep 17 00:00:00 2001 From: 0x6e616d Date: Sun, 11 Aug 2024 03:57:30 +0700 Subject: [PATCH 3/8] chore: batch blocks when syncing the reorg blocks --- internal/pkg/bcclient/client.go | 6 ++-- internal/pkg/listener/service.go | 51 ++++++++++++++------------- internal/pkg/listener/service_test.go | 10 +++--- 3 files changed, 32 insertions(+), 35 deletions(-) diff --git a/internal/pkg/bcclient/client.go b/internal/pkg/bcclient/client.go index aa61b3f..220c52c 100644 --- a/internal/pkg/bcclient/client.go +++ b/internal/pkg/bcclient/client.go @@ -25,7 +25,7 @@ type Client struct { func New(ctx context.Context, rpcURL string) (*Client, error) { httpClient := &http.Client{ - Timeout: 3 * time.Second, + Timeout: 10 * time.Second, } rpcClient, err := rpc.DialOptions(ctx, rpcURL, rpc.WithHTTPClient(httpClient)) if err != nil { @@ -71,15 +71,13 @@ func (c *Client) HeaderAtBlockNumber(ctx context.Context, blockNo uint64) (*ethe } func (c *Client) GetLogs(ctx context.Context, blockHash common.Hash) ([]ethereumTypes.Log, error) { - timeOutCtx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() query := ethereum.FilterQuery{ BlockHash: &blockHash, } // Get the logs - logs, err := c.defaultClient.FilterLogs(timeOutCtx, query) + logs, err := c.defaultClient.FilterLogs(ctx, query) if err != nil { log.GetLogger().Errorw("Failed to retrieve logs", "blockHash", blockHash.Hex(), "err", err) return nil, err diff --git a/internal/pkg/listener/service.go b/internal/pkg/listener/service.go index a1d5866..2cf9a52 100644 --- a/internal/pkg/listener/service.go +++ b/internal/pkg/listener/service.go @@ -317,7 +317,7 @@ func (s *EventService) syncOldBlocks(ctx context.Context, headCh chan *types.New blocksNeedToConsume := onchainBlockNo - consumedBlockNo - totalBatches := int(math.Ceil(float64(blocksNeedToConsume) / float64(MaxBatchBlocksSize))) + totalBatches := calculateBatchBlocks(int(blocksNeedToConsume)) s.l.Infow("Total batches", "total", totalBatches) skip := consumedBlockNo + 1 @@ -358,35 +358,32 @@ func (s *EventService) handleReorgBlocks(ctx context.Context, newHeader *ethereu return nil, fmt.Errorf("reorged block numbers don't match") } - var g errgroup.Group + reorgedBlocks := make([]*types.NewBlock, 0) + totalBatches := calculateBatchBlocks(len(newBlocks)) - reorgedBlocks := make([]*types.NewBlock, len(newBlocks)) - for i, newBlock := range newBlocks { - s.l.Infow("Detect reorg block", "block", newBlock.Number.Uint64()) - i := i - newBlock := newBlock + s.l.Infow("Total batches", "total", totalBatches) + skip := newBlocks[0].Number.Uint64() + to := newBlocks[len(newBlocks)-1].Number.Uint64() + idx := uint64(0) + for i := 0; i < totalBatches; i++ { + fromBlock := skip + toBlock := skip + MaxBatchBlocksSize - 1 - g.Go(func() error { - blockHash := newBlock.Hash() - reorgedLogs, errLogs := s.bcClient.GetLogs(ctx, blockHash) - if errLogs != nil { - s.l.Errorw("Failed to get logs", "err", errLogs) - return errLogs - } + if toBlock > to { + toBlock = to + } - reorgedBlocks[i] = &types.NewBlock{ - Header: newBlock, - Logs: reorgedLogs, - ReorgedBlockHash: reorgedBlockHashes[i], - } + blocks, err := s.bcClient.GetBlocks(ctx, true, fromBlock, toBlock) + if err != nil { + return nil, err + } - return nil - }) - } + for j, block := range blocks { + block.ReorgedBlockHash = reorgedBlockHashes[int(idx)+j] + } - err = g.Wait() - if err != nil { - return nil, err + idx += toBlock - fromBlock + 1 + reorgedBlocks = append(reorgedBlocks, blocks...) } return reorgedBlocks, nil @@ -396,3 +393,7 @@ func serializeEventRequestWithAddressAndABI(address common.Address, hashedABI co result := fmt.Sprintf("%s:%s", address.String(), hashedABI) return result } + +func calculateBatchBlocks(blocks int) int { + return int(math.Ceil(float64(blocks) / float64(MaxBatchBlocksSize))) +} diff --git a/internal/pkg/listener/service_test.go b/internal/pkg/listener/service_test.go index af1f6ca..e1a686e 100644 --- a/internal/pkg/listener/service_test.go +++ b/internal/pkg/listener/service_test.go @@ -80,21 +80,19 @@ func Test_handleReorgBlock(t *testing.T) { listenerSrv, err := MakeService("test-event-listener", bcClient, keeper) require.NoError(t, err) - block, err := bcClient.GetHeader(ctx) + header, err := bcClient.GetHeader(ctx) require.NoError(t, err) - blocks, err := listenerSrv.handleReorgBlocks(ctx, block) + err = keeper.SetHead(ctx, header, constant.ZeroHash) require.NoError(t, err) - assert.Len(t, blocks, 0) - // pass through two blocks time.Sleep(24 * time.Second) - block, err = bcClient.GetHeader(ctx) + header, err = bcClient.GetHeader(ctx) require.NoError(t, err) - blocks, err = listenerSrv.handleReorgBlocks(ctx, block) + blocks, err := listenerSrv.handleReorgBlocks(ctx, header) require.NoError(t, err) assert.Equal(t, true, len(blocks) > 0) From 1290de1ebb6361c5803fb6d87de43c16e326f62b Mon Sep 17 00:00:00 2001 From: 0x6e616d Date: Sun, 11 Aug 2024 11:53:45 +0700 Subject: [PATCH 4/8] chore: comment to clarify in service_test --- internal/pkg/listener/service_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/listener/service_test.go b/internal/pkg/listener/service_test.go index e1a686e..1fa4d35 100644 --- a/internal/pkg/listener/service_test.go +++ b/internal/pkg/listener/service_test.go @@ -86,7 +86,7 @@ func Test_handleReorgBlock(t *testing.T) { err = keeper.SetHead(ctx, header, constant.ZeroHash) require.NoError(t, err) - // pass through two blocks + // This causes the gap between the current head in the keeper and the latest head at least two blocks time.Sleep(24 * time.Second) header, err = bcClient.GetHeader(ctx) From 5893e7df232260e61a59f715a76b4c80d8179d9c Mon Sep 17 00:00:00 2001 From: 0x6e616d Date: Sun, 11 Aug 2024 12:13:11 +0700 Subject: [PATCH 5/8] feat: add FetchTokenInfo test --- internal/pkg/erc20/client_test.go | 56 +++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 internal/pkg/erc20/client_test.go diff --git a/internal/pkg/erc20/client_test.go b/internal/pkg/erc20/client_test.go new file mode 100644 index 0000000..7a30ec5 --- /dev/null +++ b/internal/pkg/erc20/client_test.go @@ -0,0 +1,56 @@ +package erc20 + +import ( + "context" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/bcclient" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/types" +) + +func Test_FetchTokenInfo(t *testing.T) { + ctx := context.Background() + + type testCases []struct { + Expected types.Token + ContractAddress string + } + + bcClient, err := bcclient.New(ctx, "https://sepolia.rpc.tokamak.network") + require.NoError(t, err) + + var tests = testCases{ + { + Expected: types.Token{ + Symbol: "TON", + Decimals: 18, + Address: strings.ToLower("0xa30fe40285B8f5c0457DbC3B7C8A280373c40044"), + }, + ContractAddress: strings.ToLower("0xa30fe40285B8f5c0457DbC3B7C8A280373c40044"), + }, + { + Expected: types.Token{ + Symbol: "TOS", + Decimals: 18, + Address: strings.ToLower("0xFF3Ef745D9878AfE5934Ff0b130868AFDDbc58e8"), + }, + ContractAddress: strings.ToLower("0xFF3Ef745D9878AfE5934Ff0b130868AFDDbc58e8"), + }, + } + t.Parallel() + for _, test := range tests { + t.Run(test.ContractAddress, func(t *testing.T) { + tokenInfo, err := FetchTokenInfo(bcClient, test.ContractAddress) + require.NoError(t, err) + + assert.NotEmpty(t, tokenInfo) + assert.Equal(t, test.Expected.Symbol, tokenInfo.Symbol) + assert.Equal(t, test.Expected.Decimals, tokenInfo.Decimals) + assert.Equal(t, test.ContractAddress, tokenInfo.Address) + }) + } + +} From 1621157e887967339c089f648dbf7665df8bcd4b Mon Sep 17 00:00:00 2001 From: 0x6e616d Date: Mon, 12 Aug 2024 11:52:49 +0700 Subject: [PATCH 6/8] feat: improve bcclient to fetch the requests through httpclient --- cmd/app/flags/flags.go | 16 ++++++++++++++ cmd/app/main.go | 2 ++ internal/app/thanos-notif/app.go | 4 ++-- internal/app/thanos-notif/config.go | 14 ++++++++++-- internal/pkg/bcclient/client.go | 23 ++++++++++++++++---- internal/pkg/erc20/client_test.go | 7 +++++- internal/pkg/listener/service_test.go | 9 ++++---- internal/pkg/repository/block_keeper_test.go | 11 +++++----- 8 files changed, 68 insertions(+), 18 deletions(-) diff --git a/cmd/app/flags/flags.go b/cmd/app/flags/flags.go index 93b75a8..d123027 100644 --- a/cmd/app/flags/flags.go +++ b/cmd/app/flags/flags.go @@ -7,8 +7,10 @@ import ( const ( NetworkFlagName = "network" + L1HttpRpcUrlFlagName = "l1-http-rpc-url" L1WsRpcUrlFlagName = "l1-ws-rpc" L2WsRpcUrlFlagName = "l2-ws-rpc" + L2HttpRpcUrlFlagName = "l2-http-rpc" L1StandardBridgeFlagName = "l1-standard-bridge-address" L2StandardBridgeFlagName = "l2-standard-bridge-address" L1UsdcBridgeFlagName = "l1-usdc-bridge-address" @@ -27,6 +29,12 @@ var ( Usage: "Network name", EnvVars: []string{"NETWORK"}, } + L1HttpRpcFlag = &cli.StringFlag{ + Name: L1HttpRpcUrlFlagName, + Usage: "L1 HTTP RPC url", + Value: "http://localhost:8545", + EnvVars: []string{"L1_HTTP_RPC"}, + } L1WsRpcFlag = &cli.StringFlag{ Name: L1WsRpcUrlFlagName, Usage: "L1 RPC url", @@ -39,6 +47,12 @@ var ( Value: "ws://localhost:9546", EnvVars: []string{"L2_WS_RPC"}, } + L2HttpRpcFlag = &cli.StringFlag{ + Name: L2HttpRpcUrlFlagName, + Usage: "L2 HTTP RPC url", + Value: "http://localhost:9545", + EnvVars: []string{"L2_HTTP_RPC"}, + } L1StandardBridgeFlag = &cli.StringFlag{ Name: L1StandardBridgeFlagName, Usage: "L1StandardBridge address", @@ -97,7 +111,9 @@ func Flags() []cli.Flag { return []cli.Flag{ NetworkFlag, L1WsRpcFlag, + L1HttpRpcFlag, L2WsRpcFlag, + L2HttpRpcFlag, L1StandardBridgeFlag, L2StandardBridgeFlag, L1UsdcBridgeFlag, diff --git a/cmd/app/main.go b/cmd/app/main.go index 63c0bd0..7221265 100644 --- a/cmd/app/main.go +++ b/cmd/app/main.go @@ -35,7 +35,9 @@ func startListener(ctx *cli.Context) error { config := &thanosnotif.Config{ Network: ctx.String(flags.NetworkFlagName), L1WsRpc: ctx.String(flags.L1WsRpcUrlFlagName), + L1HttpRpc: ctx.String(flags.L1HttpRpcUrlFlagName), L2WsRpc: ctx.String(flags.L2WsRpcUrlFlagName), + L2HttpRpc: ctx.String(flags.L2HttpRpcUrlFlagName), L1StandardBridge: ctx.String(flags.L1StandardBridgeFlagName), L2StandardBridge: ctx.String(flags.L2StandardBridgeFlagName), L1UsdcBridge: ctx.String(flags.L1UsdcBridgeFlagName), diff --git a/internal/app/thanos-notif/app.go b/internal/app/thanos-notif/app.go index c2d7ec5..838f1ab 100644 --- a/internal/app/thanos-notif/app.go +++ b/internal/app/thanos-notif/app.go @@ -42,13 +42,13 @@ func New(ctx context.Context, cfg *Config) (*App, error) { return nil, err } - l1Client, err := bcclient.New(ctx, cfg.L1WsRpc) + l1Client, err := bcclient.New(ctx, cfg.L1WsRpc, cfg.L1HttpRpc) if err != nil { log.GetLogger().Errorw("Failed to create L1 client", "error", err) return nil, err } - l2Client, err := bcclient.New(ctx, cfg.L2WsRpc) + l2Client, err := bcclient.New(ctx, cfg.L2WsRpc, cfg.L2HttpRpc) if err != nil { log.GetLogger().Errorw("Failed to create L2 client", "error", err) return nil, err diff --git a/internal/app/thanos-notif/config.go b/internal/app/thanos-notif/config.go index 62ff0fa..0cf7e7c 100644 --- a/internal/app/thanos-notif/config.go +++ b/internal/app/thanos-notif/config.go @@ -9,9 +9,11 @@ import ( type Config struct { Network string - L1WsRpc string + L1HttpRpc string + L1WsRpc string - L2WsRpc string + L2HttpRpc string + L2WsRpc string L1StandardBridge string L2StandardBridge string @@ -35,10 +37,18 @@ func (c *Config) Validate() error { return errors.New("l1 ws rpc address is required") } + if c.L1HttpRpc == "" { + return errors.New("l1 http rpc address is required") + } + if c.L2WsRpc == "" { return errors.New("l2 ws rpc address is required") } + if c.L2HttpRpc == "" { + return errors.New("l2 http rpc address is required") + } + if c.L1StandardBridge == "" { return errors.New("l1 standard bridge is required") } diff --git a/internal/pkg/bcclient/client.go b/internal/pkg/bcclient/client.go index 220c52c..85d933a 100644 --- a/internal/pkg/bcclient/client.go +++ b/internal/pkg/bcclient/client.go @@ -20,19 +20,23 @@ import ( type Client struct { defaultClient *ethclient.Client + wsClient *ethclient.Client chainID *big.Int } -func New(ctx context.Context, rpcURL string) (*Client, error) { +func New(ctx context.Context, wsURL, rpcURL string) (*Client, error) { httpClient := &http.Client{ Timeout: 10 * time.Second, } - rpcClient, err := rpc.DialOptions(ctx, rpcURL, rpc.WithHTTPClient(httpClient)) + ethClient, err := initEthClient(ctx, rpcURL, httpClient) if err != nil { return nil, err } - ethClient := ethclient.NewClient(rpcClient) + wsClient, err := initEthClient(ctx, wsURL, httpClient) + if err != nil { + return nil, err + } chainID, err := ethClient.ChainID(ctx) if err != nil { @@ -41,16 +45,27 @@ func New(ctx context.Context, rpcURL string) (*Client, error) { return &Client{ defaultClient: ethClient, + wsClient: wsClient, chainID: chainID, }, nil } +func initEthClient(ctx context.Context, url string, httpClient *http.Client) (*ethclient.Client, error) { + rpcClient, err := rpc.DialOptions(ctx, url, rpc.WithHTTPClient(httpClient)) + if err != nil { + return nil, err + } + + ethClient := ethclient.NewClient(rpcClient) + return ethClient, nil +} + func (c *Client) GetClient() *ethclient.Client { return c.defaultClient } func (c *Client) SubscribeNewHead(ctx context.Context, newHeadCh chan<- *ethereumTypes.Header) (ethereum.Subscription, error) { - return c.defaultClient.SubscribeNewHead(ctx, newHeadCh) + return c.wsClient.SubscribeNewHead(ctx, newHeadCh) } func (c *Client) BlockNumber(ctx context.Context) (uint64, error) { diff --git a/internal/pkg/erc20/client_test.go b/internal/pkg/erc20/client_test.go index 7a30ec5..aec81ed 100644 --- a/internal/pkg/erc20/client_test.go +++ b/internal/pkg/erc20/client_test.go @@ -11,6 +11,11 @@ import ( "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/types" ) +var ( + wsUrl = "ws://sepolia.rpc.tokamak.network:8546" + httpUrl = "https://sepolia.rpc.tokamak.network" +) + func Test_FetchTokenInfo(t *testing.T) { ctx := context.Background() @@ -19,7 +24,7 @@ func Test_FetchTokenInfo(t *testing.T) { ContractAddress string } - bcClient, err := bcclient.New(ctx, "https://sepolia.rpc.tokamak.network") + bcClient, err := bcclient.New(ctx, wsUrl, httpUrl) require.NoError(t, err) var tests = testCases{ diff --git a/internal/pkg/listener/service_test.go b/internal/pkg/listener/service_test.go index 1fa4d35..d502b96 100644 --- a/internal/pkg/listener/service_test.go +++ b/internal/pkg/listener/service_test.go @@ -15,8 +15,9 @@ import ( "golang.org/x/sync/errgroup" ) -const ( - rpcUrl = "ws://sepolia.rpc.tokamak.network:8546" +var ( + wsUrl = "ws://sepolia.rpc.tokamak.network:8546" + httpUrl = "https://sepolia.rpc.tokamak.network" ) func Test_syncOldBlocks(t *testing.T) { @@ -25,7 +26,7 @@ func Test_syncOldBlocks(t *testing.T) { ) ctx := context.Background() - bcClient, err := bcclient.New(ctx, rpcUrl) + bcClient, err := bcclient.New(ctx, wsUrl, httpUrl) require.NoError(t, err) syncBlockKeeper := &testutil.SyncBlockInMemKeeper{} @@ -71,7 +72,7 @@ func Test_syncOldBlocks(t *testing.T) { func Test_handleReorgBlock(t *testing.T) { ctx := context.Background() - bcClient, err := bcclient.New(ctx, rpcUrl) + bcClient, err := bcclient.New(ctx, wsUrl, httpUrl) require.NoError(t, err) syncBlockKeeper := &testutil.SyncBlockInMemKeeper{} diff --git a/internal/pkg/repository/block_keeper_test.go b/internal/pkg/repository/block_keeper_test.go index a32a9b2..8a12a77 100644 --- a/internal/pkg/repository/block_keeper_test.go +++ b/internal/pkg/repository/block_keeper_test.go @@ -10,8 +10,9 @@ import ( "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/testutil" ) -const ( - rpcUrl = "ws://sepolia.rpc.tokamak.network:8546" +var ( + wsUrl = "ws://sepolia.rpc.tokamak.network:8546" + httpUrl = "https://sepolia.rpc.tokamak.network" ) func TestBlockKeeper_initWithExistingBlockHash(t *testing.T) { @@ -19,7 +20,7 @@ func TestBlockKeeper_initWithExistingBlockHash(t *testing.T) { syncBlockKeeper := &testutil.SyncBlockInMemKeeper{} - bcClient, err := bcclient.New(ctx, rpcUrl) + bcClient, err := bcclient.New(ctx, wsUrl, httpUrl) require.NoError(t, err) blockNo, err := bcClient.BlockNumber(ctx) @@ -45,7 +46,7 @@ func TestBlockKeeper_initWithoutExistingBlockHash(t *testing.T) { syncBlockKeeper := &testutil.SyncBlockInMemKeeper{} - bcClient, err := bcclient.New(ctx, rpcUrl) + bcClient, err := bcclient.New(ctx, wsUrl, httpUrl) require.NoError(t, err) currentBlock, err := bcClient.GetHeader(ctx) @@ -64,7 +65,7 @@ func TestBlockKeeper_getReorgBlocks(t *testing.T) { syncBlockKeeper := &testutil.SyncBlockInMemKeeper{} - bcClient, err := bcclient.New(ctx, rpcUrl) + bcClient, err := bcclient.New(ctx, wsUrl, httpUrl) require.NoError(t, err) blockNo, err := bcClient.BlockNumber(ctx) From 41cdf58ecf8e12466c8d93b47f3d51ef1b1e2aa5 Mon Sep 17 00:00:00 2001 From: 0x6e616d Date: Mon, 12 Aug 2024 12:07:49 +0700 Subject: [PATCH 7/8] chore: rename service name on ci.yaml --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 70a7491..39fb4eb 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -15,7 +15,7 @@ on: - "*" env: - SERVICE: node-proxy + SERVICE: thanos-event-listener jobs: prepare: From ab18331cdf38e21985003e74af4bb178f23e355b Mon Sep 17 00:00:00 2001 From: 0x6e616d Date: Fri, 16 Aug 2024 15:46:38 +0700 Subject: [PATCH 8/8] feat: retry to get logs three times if failed --- internal/pkg/bcclient/client.go | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/internal/pkg/bcclient/client.go b/internal/pkg/bcclient/client.go index 85d933a..23a5c49 100644 --- a/internal/pkg/bcclient/client.go +++ b/internal/pkg/bcclient/client.go @@ -86,19 +86,25 @@ func (c *Client) HeaderAtBlockNumber(ctx context.Context, blockNo uint64) (*ethe } func (c *Client) GetLogs(ctx context.Context, blockHash common.Hash) ([]ethereumTypes.Log, error) { - - query := ethereum.FilterQuery{ - BlockHash: &blockHash, - } - - // Get the logs - logs, err := c.defaultClient.FilterLogs(ctx, query) - if err != nil { - log.GetLogger().Errorw("Failed to retrieve logs", "blockHash", blockHash.Hex(), "err", err) - return nil, err + var err error + var logs []ethereumTypes.Log + for i := 0; i < 3; i++ { + query := ethereum.FilterQuery{ + BlockHash: &blockHash, + } + + // Get the logs + logs, err = c.defaultClient.FilterLogs(ctx, query) + if err != nil { + log.GetLogger().Errorw("Failed to retrieve logs", "err", err) + time.Sleep(5 * time.Second) + continue + } + + return logs, nil } - return logs, nil + return nil, err } func (c *Client) HeaderAtBlockHash(ctx context.Context, blockHash common.Hash) (*ethereumTypes.Header, error) {