diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 70a7491..39fb4eb 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -15,7 +15,7 @@ on: - "*" env: - SERVICE: node-proxy + SERVICE: thanos-event-listener jobs: prepare: diff --git a/cmd/app/flags/flags.go b/cmd/app/flags/flags.go index 93b75a8..d123027 100644 --- a/cmd/app/flags/flags.go +++ b/cmd/app/flags/flags.go @@ -7,8 +7,10 @@ import ( const ( NetworkFlagName = "network" + L1HttpRpcUrlFlagName = "l1-http-rpc-url" L1WsRpcUrlFlagName = "l1-ws-rpc" L2WsRpcUrlFlagName = "l2-ws-rpc" + L2HttpRpcUrlFlagName = "l2-http-rpc" L1StandardBridgeFlagName = "l1-standard-bridge-address" L2StandardBridgeFlagName = "l2-standard-bridge-address" L1UsdcBridgeFlagName = "l1-usdc-bridge-address" @@ -27,6 +29,12 @@ var ( Usage: "Network name", EnvVars: []string{"NETWORK"}, } + L1HttpRpcFlag = &cli.StringFlag{ + Name: L1HttpRpcUrlFlagName, + Usage: "L1 HTTP RPC url", + Value: "http://localhost:8545", + EnvVars: []string{"L1_HTTP_RPC"}, + } L1WsRpcFlag = &cli.StringFlag{ Name: L1WsRpcUrlFlagName, Usage: "L1 RPC url", @@ -39,6 +47,12 @@ var ( Value: "ws://localhost:9546", EnvVars: []string{"L2_WS_RPC"}, } + L2HttpRpcFlag = &cli.StringFlag{ + Name: L2HttpRpcUrlFlagName, + Usage: "L2 HTTP RPC url", + Value: "http://localhost:9545", + EnvVars: []string{"L2_HTTP_RPC"}, + } L1StandardBridgeFlag = &cli.StringFlag{ Name: L1StandardBridgeFlagName, Usage: "L1StandardBridge address", @@ -97,7 +111,9 @@ func Flags() []cli.Flag { return []cli.Flag{ NetworkFlag, L1WsRpcFlag, + L1HttpRpcFlag, L2WsRpcFlag, + L2HttpRpcFlag, L1StandardBridgeFlag, L2StandardBridgeFlag, L1UsdcBridgeFlag, diff --git a/cmd/app/main.go b/cmd/app/main.go index 63c0bd0..7221265 100644 --- a/cmd/app/main.go +++ b/cmd/app/main.go @@ -35,7 +35,9 @@ func startListener(ctx *cli.Context) error { config := &thanosnotif.Config{ Network: ctx.String(flags.NetworkFlagName), L1WsRpc: ctx.String(flags.L1WsRpcUrlFlagName), + L1HttpRpc: ctx.String(flags.L1HttpRpcUrlFlagName), L2WsRpc: ctx.String(flags.L2WsRpcUrlFlagName), + L2HttpRpc: ctx.String(flags.L2HttpRpcUrlFlagName), L1StandardBridge: ctx.String(flags.L1StandardBridgeFlagName), L2StandardBridge: ctx.String(flags.L2StandardBridgeFlagName), L1UsdcBridge: ctx.String(flags.L1UsdcBridgeFlagName), diff --git a/internal/app/thanos-notif/app.go b/internal/app/thanos-notif/app.go index c2d7ec5..838f1ab 100644 --- a/internal/app/thanos-notif/app.go +++ b/internal/app/thanos-notif/app.go @@ -42,13 +42,13 @@ func New(ctx context.Context, cfg *Config) (*App, error) { return nil, err } - l1Client, err := bcclient.New(ctx, cfg.L1WsRpc) + l1Client, err := bcclient.New(ctx, cfg.L1WsRpc, cfg.L1HttpRpc) if err != nil { log.GetLogger().Errorw("Failed to create L1 client", "error", err) return nil, err } - l2Client, err := bcclient.New(ctx, cfg.L2WsRpc) + l2Client, err := bcclient.New(ctx, cfg.L2WsRpc, cfg.L2HttpRpc) if err != nil { log.GetLogger().Errorw("Failed to create L2 client", "error", err) return nil, err diff --git a/internal/app/thanos-notif/config.go b/internal/app/thanos-notif/config.go index 62ff0fa..0cf7e7c 100644 --- a/internal/app/thanos-notif/config.go +++ b/internal/app/thanos-notif/config.go @@ -9,9 +9,11 @@ import ( type Config struct { Network string - L1WsRpc string + L1HttpRpc string + L1WsRpc string - L2WsRpc string + L2HttpRpc string + L2WsRpc string L1StandardBridge string L2StandardBridge string @@ -35,10 +37,18 @@ func (c *Config) Validate() error { return errors.New("l1 ws rpc address is required") } + if c.L1HttpRpc == "" { + return errors.New("l1 http rpc address is required") + } + if c.L2WsRpc == "" { return errors.New("l2 ws rpc address is required") } + if c.L2HttpRpc == "" { + return errors.New("l2 http rpc address is required") + } + if c.L1StandardBridge == "" { return errors.New("l1 standard bridge is required") } diff --git a/internal/pkg/bcclient/client.go b/internal/pkg/bcclient/client.go index aa61b3f..23a5c49 100644 --- a/internal/pkg/bcclient/client.go +++ b/internal/pkg/bcclient/client.go @@ -20,19 +20,23 @@ import ( type Client struct { defaultClient *ethclient.Client + wsClient *ethclient.Client chainID *big.Int } -func New(ctx context.Context, rpcURL string) (*Client, error) { +func New(ctx context.Context, wsURL, rpcURL string) (*Client, error) { httpClient := &http.Client{ - Timeout: 3 * time.Second, + Timeout: 10 * time.Second, } - rpcClient, err := rpc.DialOptions(ctx, rpcURL, rpc.WithHTTPClient(httpClient)) + ethClient, err := initEthClient(ctx, rpcURL, httpClient) if err != nil { return nil, err } - ethClient := ethclient.NewClient(rpcClient) + wsClient, err := initEthClient(ctx, wsURL, httpClient) + if err != nil { + return nil, err + } chainID, err := ethClient.ChainID(ctx) if err != nil { @@ -41,16 +45,27 @@ func New(ctx context.Context, rpcURL string) (*Client, error) { return &Client{ defaultClient: ethClient, + wsClient: wsClient, chainID: chainID, }, nil } +func initEthClient(ctx context.Context, url string, httpClient *http.Client) (*ethclient.Client, error) { + rpcClient, err := rpc.DialOptions(ctx, url, rpc.WithHTTPClient(httpClient)) + if err != nil { + return nil, err + } + + ethClient := ethclient.NewClient(rpcClient) + return ethClient, nil +} + func (c *Client) GetClient() *ethclient.Client { return c.defaultClient } func (c *Client) SubscribeNewHead(ctx context.Context, newHeadCh chan<- *ethereumTypes.Header) (ethereum.Subscription, error) { - return c.defaultClient.SubscribeNewHead(ctx, newHeadCh) + return c.wsClient.SubscribeNewHead(ctx, newHeadCh) } func (c *Client) BlockNumber(ctx context.Context) (uint64, error) { @@ -71,21 +86,25 @@ func (c *Client) HeaderAtBlockNumber(ctx context.Context, blockNo uint64) (*ethe } func (c *Client) GetLogs(ctx context.Context, blockHash common.Hash) ([]ethereumTypes.Log, error) { - timeOutCtx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - - query := ethereum.FilterQuery{ - BlockHash: &blockHash, - } - - // Get the logs - logs, err := c.defaultClient.FilterLogs(timeOutCtx, query) - if err != nil { - log.GetLogger().Errorw("Failed to retrieve logs", "blockHash", blockHash.Hex(), "err", err) - return nil, err + var err error + var logs []ethereumTypes.Log + for i := 0; i < 3; i++ { + query := ethereum.FilterQuery{ + BlockHash: &blockHash, + } + + // Get the logs + logs, err = c.defaultClient.FilterLogs(ctx, query) + if err != nil { + log.GetLogger().Errorw("Failed to retrieve logs", "err", err) + time.Sleep(5 * time.Second) + continue + } + + return logs, nil } - return logs, nil + return nil, err } func (c *Client) HeaderAtBlockHash(ctx context.Context, blockHash common.Hash) (*ethereumTypes.Header, error) { diff --git a/internal/pkg/erc20/client_test.go b/internal/pkg/erc20/client_test.go new file mode 100644 index 0000000..aec81ed --- /dev/null +++ b/internal/pkg/erc20/client_test.go @@ -0,0 +1,61 @@ +package erc20 + +import ( + "context" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/bcclient" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/types" +) + +var ( + wsUrl = "ws://sepolia.rpc.tokamak.network:8546" + httpUrl = "https://sepolia.rpc.tokamak.network" +) + +func Test_FetchTokenInfo(t *testing.T) { + ctx := context.Background() + + type testCases []struct { + Expected types.Token + ContractAddress string + } + + bcClient, err := bcclient.New(ctx, wsUrl, httpUrl) + require.NoError(t, err) + + var tests = testCases{ + { + Expected: types.Token{ + Symbol: "TON", + Decimals: 18, + Address: strings.ToLower("0xa30fe40285B8f5c0457DbC3B7C8A280373c40044"), + }, + ContractAddress: strings.ToLower("0xa30fe40285B8f5c0457DbC3B7C8A280373c40044"), + }, + { + Expected: types.Token{ + Symbol: "TOS", + Decimals: 18, + Address: strings.ToLower("0xFF3Ef745D9878AfE5934Ff0b130868AFDDbc58e8"), + }, + ContractAddress: strings.ToLower("0xFF3Ef745D9878AfE5934Ff0b130868AFDDbc58e8"), + }, + } + t.Parallel() + for _, test := range tests { + t.Run(test.ContractAddress, func(t *testing.T) { + tokenInfo, err := FetchTokenInfo(bcClient, test.ContractAddress) + require.NoError(t, err) + + assert.NotEmpty(t, tokenInfo) + assert.Equal(t, test.Expected.Symbol, tokenInfo.Symbol) + assert.Equal(t, test.Expected.Decimals, tokenInfo.Decimals) + assert.Equal(t, test.ContractAddress, tokenInfo.Address) + }) + } + +} diff --git a/internal/pkg/listener/service.go b/internal/pkg/listener/service.go index 2146221..2cf9a52 100644 --- a/internal/pkg/listener/service.go +++ b/internal/pkg/listener/service.go @@ -6,6 +6,7 @@ import ( "encoding/gob" "fmt" "math" + "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" @@ -139,9 +140,10 @@ func (s *EventService) Start(ctx context.Context) error { return err } - s.sub = event.ResubscribeErr(10, func(ctx context.Context, err error) (event.Subscription, error) { + s.sub = event.ResubscribeErr(5*time.Second, func(ctx context.Context, err error) (event.Subscription, error) { if err != nil { s.l.Errorw("Failed to re-subscribe the event", "err", err) + time.Sleep(1 * time.Second) } return s.subscribeNewHead(ctx) @@ -315,7 +317,7 @@ func (s *EventService) syncOldBlocks(ctx context.Context, headCh chan *types.New blocksNeedToConsume := onchainBlockNo - consumedBlockNo - totalBatches := int(math.Ceil(float64(blocksNeedToConsume) / float64(MaxBatchBlocksSize))) + totalBatches := calculateBatchBlocks(int(blocksNeedToConsume)) s.l.Infow("Total batches", "total", totalBatches) skip := consumedBlockNo + 1 @@ -356,35 +358,32 @@ func (s *EventService) handleReorgBlocks(ctx context.Context, newHeader *ethereu return nil, fmt.Errorf("reorged block numbers don't match") } - var g errgroup.Group + reorgedBlocks := make([]*types.NewBlock, 0) + totalBatches := calculateBatchBlocks(len(newBlocks)) - reorgedBlocks := make([]*types.NewBlock, len(newBlocks)) - for i, newBlock := range newBlocks { - s.l.Infow("Detect reorg block", "block", newBlock.Number.Uint64()) - i := i - newBlock := newBlock + s.l.Infow("Total batches", "total", totalBatches) + skip := newBlocks[0].Number.Uint64() + to := newBlocks[len(newBlocks)-1].Number.Uint64() + idx := uint64(0) + for i := 0; i < totalBatches; i++ { + fromBlock := skip + toBlock := skip + MaxBatchBlocksSize - 1 - g.Go(func() error { - blockHash := newBlock.Hash() - reorgedLogs, errLogs := s.bcClient.GetLogs(ctx, blockHash) - if errLogs != nil { - s.l.Errorw("Failed to get logs", "err", errLogs) - return errLogs - } + if toBlock > to { + toBlock = to + } - reorgedBlocks[i] = &types.NewBlock{ - Header: newBlock, - Logs: reorgedLogs, - ReorgedBlockHash: reorgedBlockHashes[i], - } + blocks, err := s.bcClient.GetBlocks(ctx, true, fromBlock, toBlock) + if err != nil { + return nil, err + } - return nil - }) - } + for j, block := range blocks { + block.ReorgedBlockHash = reorgedBlockHashes[int(idx)+j] + } - err = g.Wait() - if err != nil { - return nil, err + idx += toBlock - fromBlock + 1 + reorgedBlocks = append(reorgedBlocks, blocks...) } return reorgedBlocks, nil @@ -394,3 +393,7 @@ func serializeEventRequestWithAddressAndABI(address common.Address, hashedABI co result := fmt.Sprintf("%s:%s", address.String(), hashedABI) return result } + +func calculateBatchBlocks(blocks int) int { + return int(math.Ceil(float64(blocks) / float64(MaxBatchBlocksSize))) +} diff --git a/internal/pkg/listener/service_test.go b/internal/pkg/listener/service_test.go new file mode 100644 index 0000000..d502b96 --- /dev/null +++ b/internal/pkg/listener/service_test.go @@ -0,0 +1,101 @@ +package listener + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/bcclient" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/constant" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/repository" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/testutil" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/types" + "golang.org/x/sync/errgroup" +) + +var ( + wsUrl = "ws://sepolia.rpc.tokamak.network:8546" + httpUrl = "https://sepolia.rpc.tokamak.network" +) + +func Test_syncOldBlocks(t *testing.T) { + const ( + totalOldBlocks = uint64(5) + ) + ctx := context.Background() + + bcClient, err := bcclient.New(ctx, wsUrl, httpUrl) + require.NoError(t, err) + + syncBlockKeeper := &testutil.SyncBlockInMemKeeper{} + keeper, err := repository.NewBlockKeeper(ctx, bcClient, syncBlockKeeper) + require.NoError(t, err) + + listenerSrv, err := MakeService("test-event-listener", bcClient, keeper) + require.NoError(t, err) + + currentHead, err := bcClient.BlockNumber(ctx) + require.NoError(t, err) + + oldBlockNumber := currentHead - totalOldBlocks + consumingBlock, err := bcClient.HeaderAtBlockNumber(ctx, oldBlockNumber) + require.NoError(t, err) + + err = keeper.SetHead(ctx, consumingBlock, constant.ZeroHash) + require.NoError(t, err) + oldBlocksCh := make(chan *types.NewBlock) + + g, _ := errgroup.WithContext(ctx) + g.Go(func() error { + err = listenerSrv.syncOldBlocks(ctx, oldBlocksCh) + defer close(oldBlocksCh) + return err + }) + + i := uint64(0) + blockNo := oldBlockNumber + 1 + for block := range oldBlocksCh { + currentBlockNumber := block.Header.Number.Uint64() + assert.Equal(t, currentBlockNumber, blockNo) + + t.Logf(`Got old block: %d`, block.Header.Number.Uint64()) + i++ + blockNo++ + } + assert.Equal(t, i, totalOldBlocks) + + require.NoError(t, g.Wait()) +} + +func Test_handleReorgBlock(t *testing.T) { + ctx := context.Background() + + bcClient, err := bcclient.New(ctx, wsUrl, httpUrl) + require.NoError(t, err) + + syncBlockKeeper := &testutil.SyncBlockInMemKeeper{} + keeper, err := repository.NewBlockKeeper(ctx, bcClient, syncBlockKeeper) + require.NoError(t, err) + + listenerSrv, err := MakeService("test-event-listener", bcClient, keeper) + require.NoError(t, err) + header, err := bcClient.GetHeader(ctx) + require.NoError(t, err) + + err = keeper.SetHead(ctx, header, constant.ZeroHash) + require.NoError(t, err) + + // This causes the gap between the current head in the keeper and the latest head at least two blocks + time.Sleep(24 * time.Second) + + header, err = bcClient.GetHeader(ctx) + require.NoError(t, err) + + blocks, err := listenerSrv.handleReorgBlocks(ctx, header) + require.NoError(t, err) + + assert.Equal(t, true, len(blocks) > 0) + +} diff --git a/internal/pkg/repository/block_keeper_test.go b/internal/pkg/repository/block_keeper_test.go new file mode 100644 index 0000000..8a12a77 --- /dev/null +++ b/internal/pkg/repository/block_keeper_test.go @@ -0,0 +1,94 @@ +package repository + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/bcclient" + "github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/testutil" +) + +var ( + wsUrl = "ws://sepolia.rpc.tokamak.network:8546" + httpUrl = "https://sepolia.rpc.tokamak.network" +) + +func TestBlockKeeper_initWithExistingBlockHash(t *testing.T) { + ctx := context.Background() + + syncBlockKeeper := &testutil.SyncBlockInMemKeeper{} + + bcClient, err := bcclient.New(ctx, wsUrl, httpUrl) + require.NoError(t, err) + + blockNo, err := bcClient.BlockNumber(ctx) + require.NoError(t, err) + + block, err := bcClient.HeaderAtBlockNumber(ctx, blockNo) + require.NoError(t, err) + + // set the consuming block hash + err = syncBlockKeeper.SetHead(ctx, block.Hash().String()) + require.NoError(t, err) + + blockKeeper, err := NewBlockKeeper(ctx, bcClient, syncBlockKeeper) + require.NoError(t, err) + + assert.Equal(t, TwoEpochBlocks, blockKeeper.q.Size()) + assert.Equal(t, TwoEpochBlocks, len(blockKeeper.blocks)) + assert.Equal(t, block.Hash(), blockKeeper.head.Hash()) +} + +func TestBlockKeeper_initWithoutExistingBlockHash(t *testing.T) { + ctx := context.Background() + + syncBlockKeeper := &testutil.SyncBlockInMemKeeper{} + + bcClient, err := bcclient.New(ctx, wsUrl, httpUrl) + require.NoError(t, err) + + currentBlock, err := bcClient.GetHeader(ctx) + require.NoError(t, err) + + blockKeeper, err := NewBlockKeeper(ctx, bcClient, syncBlockKeeper) + require.NoError(t, err) + + assert.Equal(t, TwoEpochBlocks, blockKeeper.q.Size()) + assert.Equal(t, TwoEpochBlocks, len(blockKeeper.blocks)) + assert.Equal(t, currentBlock.Hash(), blockKeeper.head.Hash()) +} + +func TestBlockKeeper_getReorgBlocks(t *testing.T) { + ctx := context.Background() + + syncBlockKeeper := &testutil.SyncBlockInMemKeeper{} + + bcClient, err := bcclient.New(ctx, wsUrl, httpUrl) + require.NoError(t, err) + + blockNo, err := bcClient.BlockNumber(ctx) + require.NoError(t, err) + + block, err := bcClient.HeaderAtBlockNumber(ctx, blockNo-5) + require.NoError(t, err) + + currentBlock, err := bcClient.GetHeader(ctx) + require.NoError(t, err) + + // set the consuming block hash + err = syncBlockKeeper.SetHead(ctx, block.Hash().String()) + require.NoError(t, err) + + blockKeeper, err := NewBlockKeeper(ctx, bcClient, syncBlockKeeper) + require.NoError(t, err) + + assert.Equal(t, TwoEpochBlocks, blockKeeper.q.Size()) + assert.Equal(t, TwoEpochBlocks, len(blockKeeper.blocks)) + + reorgedBlocks, _, err := blockKeeper.GetReorgHeaders(ctx, currentBlock) + require.NoError(t, err) + + assert.Equal(t, 4, len(reorgedBlocks)) +} diff --git a/internal/pkg/testutil/sync_block_inmem_keeper.go b/internal/pkg/testutil/sync_block_inmem_keeper.go new file mode 100644 index 0000000..1784546 --- /dev/null +++ b/internal/pkg/testutil/sync_block_inmem_keeper.go @@ -0,0 +1,16 @@ +package testutil + +import "context" + +type SyncBlockInMemKeeper struct { + head string +} + +func (k *SyncBlockInMemKeeper) GetHead(ctx context.Context) (string, error) { + return k.head, nil +} + +func (k *SyncBlockInMemKeeper) SetHead(ctx context.Context, head string) error { + k.head = head + return nil +}