Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add unit tests for the event listener service and block keeper #12

Merged
merged 8 commits into from
Aug 16, 2024
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ on:
- "*"

env:
SERVICE: node-proxy
SERVICE: thanos-event-listener

jobs:
prepare:
Expand Down
16 changes: 16 additions & 0 deletions cmd/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (

const (
NetworkFlagName = "network"
L1HttpRpcUrlFlagName = "l1-http-rpc-url"
L1WsRpcUrlFlagName = "l1-ws-rpc"
L2WsRpcUrlFlagName = "l2-ws-rpc"
L2HttpRpcUrlFlagName = "l2-http-rpc"
L1StandardBridgeFlagName = "l1-standard-bridge-address"
L2StandardBridgeFlagName = "l2-standard-bridge-address"
L1UsdcBridgeFlagName = "l1-usdc-bridge-address"
Expand All @@ -27,6 +29,12 @@ var (
Usage: "Network name",
EnvVars: []string{"NETWORK"},
}
L1HttpRpcFlag = &cli.StringFlag{
Name: L1HttpRpcUrlFlagName,
Usage: "L1 HTTP RPC url",
Value: "http://localhost:8545",
EnvVars: []string{"L1_HTTP_RPC"},
}
L1WsRpcFlag = &cli.StringFlag{
Name: L1WsRpcUrlFlagName,
Usage: "L1 RPC url",
Expand All @@ -39,6 +47,12 @@ var (
Value: "ws://localhost:9546",
EnvVars: []string{"L2_WS_RPC"},
}
L2HttpRpcFlag = &cli.StringFlag{
Name: L2HttpRpcUrlFlagName,
Usage: "L2 HTTP RPC url",
Value: "http://localhost:9545",
EnvVars: []string{"L2_HTTP_RPC"},
}
L1StandardBridgeFlag = &cli.StringFlag{
Name: L1StandardBridgeFlagName,
Usage: "L1StandardBridge address",
Expand Down Expand Up @@ -97,7 +111,9 @@ func Flags() []cli.Flag {
return []cli.Flag{
NetworkFlag,
L1WsRpcFlag,
L1HttpRpcFlag,
L2WsRpcFlag,
L2HttpRpcFlag,
L1StandardBridgeFlag,
L2StandardBridgeFlag,
L1UsdcBridgeFlag,
Expand Down
2 changes: 2 additions & 0 deletions cmd/app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ func startListener(ctx *cli.Context) error {
config := &thanosnotif.Config{
Network: ctx.String(flags.NetworkFlagName),
L1WsRpc: ctx.String(flags.L1WsRpcUrlFlagName),
L1HttpRpc: ctx.String(flags.L1HttpRpcUrlFlagName),
L2WsRpc: ctx.String(flags.L2WsRpcUrlFlagName),
L2HttpRpc: ctx.String(flags.L2HttpRpcUrlFlagName),
L1StandardBridge: ctx.String(flags.L1StandardBridgeFlagName),
L2StandardBridge: ctx.String(flags.L2StandardBridgeFlagName),
L1UsdcBridge: ctx.String(flags.L1UsdcBridgeFlagName),
Expand Down
4 changes: 2 additions & 2 deletions internal/app/thanos-notif/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ func New(ctx context.Context, cfg *Config) (*App, error) {
return nil, err
}

l1Client, err := bcclient.New(ctx, cfg.L1WsRpc)
l1Client, err := bcclient.New(ctx, cfg.L1WsRpc, cfg.L1HttpRpc)
if err != nil {
log.GetLogger().Errorw("Failed to create L1 client", "error", err)
return nil, err
}

l2Client, err := bcclient.New(ctx, cfg.L2WsRpc)
l2Client, err := bcclient.New(ctx, cfg.L2WsRpc, cfg.L2HttpRpc)
if err != nil {
log.GetLogger().Errorw("Failed to create L2 client", "error", err)
return nil, err
Expand Down
14 changes: 12 additions & 2 deletions internal/app/thanos-notif/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
type Config struct {
Network string

L1WsRpc string
L1HttpRpc string
L1WsRpc string

L2WsRpc string
L2HttpRpc string
L2WsRpc string

L1StandardBridge string
L2StandardBridge string
Expand All @@ -35,10 +37,18 @@ func (c *Config) Validate() error {
return errors.New("l1 ws rpc address is required")
}

if c.L1HttpRpc == "" {
return errors.New("l1 http rpc address is required")
}

if c.L2WsRpc == "" {
return errors.New("l2 ws rpc address is required")
}

if c.L2HttpRpc == "" {
return errors.New("l2 http rpc address is required")
}

if c.L1StandardBridge == "" {
return errors.New("l1 standard bridge is required")
}
Expand Down
55 changes: 37 additions & 18 deletions internal/pkg/bcclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,23 @@ import (

type Client struct {
defaultClient *ethclient.Client
wsClient *ethclient.Client
chainID *big.Int
}

func New(ctx context.Context, rpcURL string) (*Client, error) {
func New(ctx context.Context, wsURL, rpcURL string) (*Client, error) {
httpClient := &http.Client{
Timeout: 3 * time.Second,
Timeout: 10 * time.Second,
}
rpcClient, err := rpc.DialOptions(ctx, rpcURL, rpc.WithHTTPClient(httpClient))
ethClient, err := initEthClient(ctx, rpcURL, httpClient)
if err != nil {
return nil, err
}

ethClient := ethclient.NewClient(rpcClient)
wsClient, err := initEthClient(ctx, wsURL, httpClient)
if err != nil {
return nil, err
}

chainID, err := ethClient.ChainID(ctx)
if err != nil {
Expand All @@ -41,16 +45,27 @@ func New(ctx context.Context, rpcURL string) (*Client, error) {

return &Client{
defaultClient: ethClient,
wsClient: wsClient,
chainID: chainID,
}, nil
}

func initEthClient(ctx context.Context, url string, httpClient *http.Client) (*ethclient.Client, error) {
rpcClient, err := rpc.DialOptions(ctx, url, rpc.WithHTTPClient(httpClient))
if err != nil {
return nil, err
}

ethClient := ethclient.NewClient(rpcClient)
return ethClient, nil
}

func (c *Client) GetClient() *ethclient.Client {
return c.defaultClient
}

func (c *Client) SubscribeNewHead(ctx context.Context, newHeadCh chan<- *ethereumTypes.Header) (ethereum.Subscription, error) {
return c.defaultClient.SubscribeNewHead(ctx, newHeadCh)
return c.wsClient.SubscribeNewHead(ctx, newHeadCh)
}

func (c *Client) BlockNumber(ctx context.Context) (uint64, error) {
Expand All @@ -71,21 +86,25 @@ func (c *Client) HeaderAtBlockNumber(ctx context.Context, blockNo uint64) (*ethe
}

func (c *Client) GetLogs(ctx context.Context, blockHash common.Hash) ([]ethereumTypes.Log, error) {
timeOutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

query := ethereum.FilterQuery{
BlockHash: &blockHash,
}

// Get the logs
logs, err := c.defaultClient.FilterLogs(timeOutCtx, query)
if err != nil {
log.GetLogger().Errorw("Failed to retrieve logs", "blockHash", blockHash.Hex(), "err", err)
return nil, err
var err error
var logs []ethereumTypes.Log
for i := 0; i < 3; i++ {
query := ethereum.FilterQuery{
BlockHash: &blockHash,
}

// Get the logs
logs, err = c.defaultClient.FilterLogs(ctx, query)
if err != nil {
log.GetLogger().Errorw("Failed to retrieve logs", "err", err)
time.Sleep(5 * time.Second)
continue
}

return logs, nil
}

return logs, nil
return nil, err
}

func (c *Client) HeaderAtBlockHash(ctx context.Context, blockHash common.Hash) (*ethereumTypes.Header, error) {
Expand Down
61 changes: 61 additions & 0 deletions internal/pkg/erc20/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package erc20

import (
"context"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/bcclient"
"github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/types"
)

var (
wsUrl = "ws://sepolia.rpc.tokamak.network:8546"
httpUrl = "https://sepolia.rpc.tokamak.network"
)

func Test_FetchTokenInfo(t *testing.T) {
ctx := context.Background()

type testCases []struct {
Expected types.Token
ContractAddress string
}

bcClient, err := bcclient.New(ctx, wsUrl, httpUrl)
require.NoError(t, err)

var tests = testCases{
{
Expected: types.Token{
Symbol: "TON",
Decimals: 18,
Address: strings.ToLower("0xa30fe40285B8f5c0457DbC3B7C8A280373c40044"),
},
ContractAddress: strings.ToLower("0xa30fe40285B8f5c0457DbC3B7C8A280373c40044"),
},
{
Expected: types.Token{
Symbol: "TOS",
Decimals: 18,
Address: strings.ToLower("0xFF3Ef745D9878AfE5934Ff0b130868AFDDbc58e8"),
},
ContractAddress: strings.ToLower("0xFF3Ef745D9878AfE5934Ff0b130868AFDDbc58e8"),
},
}
t.Parallel()
for _, test := range tests {
t.Run(test.ContractAddress, func(t *testing.T) {
tokenInfo, err := FetchTokenInfo(bcClient, test.ContractAddress)
require.NoError(t, err)

assert.NotEmpty(t, tokenInfo)
assert.Equal(t, test.Expected.Symbol, tokenInfo.Symbol)
assert.Equal(t, test.Expected.Decimals, tokenInfo.Decimals)
assert.Equal(t, test.ContractAddress, tokenInfo.Address)
})
}

}
55 changes: 29 additions & 26 deletions internal/pkg/listener/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/gob"
"fmt"
"math"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -139,9 +140,10 @@ func (s *EventService) Start(ctx context.Context) error {
return err
}

s.sub = event.ResubscribeErr(10, func(ctx context.Context, err error) (event.Subscription, error) {
s.sub = event.ResubscribeErr(5*time.Second, func(ctx context.Context, err error) (event.Subscription, error) {
if err != nil {
s.l.Errorw("Failed to re-subscribe the event", "err", err)
time.Sleep(1 * time.Second)
}

return s.subscribeNewHead(ctx)
Expand Down Expand Up @@ -315,7 +317,7 @@ func (s *EventService) syncOldBlocks(ctx context.Context, headCh chan *types.New

blocksNeedToConsume := onchainBlockNo - consumedBlockNo

totalBatches := int(math.Ceil(float64(blocksNeedToConsume) / float64(MaxBatchBlocksSize)))
totalBatches := calculateBatchBlocks(int(blocksNeedToConsume))

s.l.Infow("Total batches", "total", totalBatches)
skip := consumedBlockNo + 1
Expand Down Expand Up @@ -356,35 +358,32 @@ func (s *EventService) handleReorgBlocks(ctx context.Context, newHeader *ethereu
return nil, fmt.Errorf("reorged block numbers don't match")
}

var g errgroup.Group
reorgedBlocks := make([]*types.NewBlock, 0)
totalBatches := calculateBatchBlocks(len(newBlocks))

reorgedBlocks := make([]*types.NewBlock, len(newBlocks))
for i, newBlock := range newBlocks {
s.l.Infow("Detect reorg block", "block", newBlock.Number.Uint64())
i := i
newBlock := newBlock
s.l.Infow("Total batches", "total", totalBatches)
skip := newBlocks[0].Number.Uint64()
to := newBlocks[len(newBlocks)-1].Number.Uint64()
idx := uint64(0)
for i := 0; i < totalBatches; i++ {
fromBlock := skip
toBlock := skip + MaxBatchBlocksSize - 1

g.Go(func() error {
blockHash := newBlock.Hash()
reorgedLogs, errLogs := s.bcClient.GetLogs(ctx, blockHash)
if errLogs != nil {
s.l.Errorw("Failed to get logs", "err", errLogs)
return errLogs
}
if toBlock > to {
toBlock = to
}

reorgedBlocks[i] = &types.NewBlock{
Header: newBlock,
Logs: reorgedLogs,
ReorgedBlockHash: reorgedBlockHashes[i],
}
blocks, err := s.bcClient.GetBlocks(ctx, true, fromBlock, toBlock)
if err != nil {
return nil, err
}

return nil
})
}
for j, block := range blocks {
block.ReorgedBlockHash = reorgedBlockHashes[int(idx)+j]
}

err = g.Wait()
if err != nil {
return nil, err
idx += toBlock - fromBlock + 1
reorgedBlocks = append(reorgedBlocks, blocks...)
}

return reorgedBlocks, nil
Expand All @@ -394,3 +393,7 @@ func serializeEventRequestWithAddressAndABI(address common.Address, hashedABI co
result := fmt.Sprintf("%s:%s", address.String(), hashedABI)
return result
}

func calculateBatchBlocks(blocks int) int {
return int(math.Ceil(float64(blocks) / float64(MaxBatchBlocksSize)))
}
Loading
Loading