Skip to content

Commit

Permalink
Merge pull request #12 from tokamak-network/feat/OR-1800-add-unit-tests
Browse files Browse the repository at this point in the history
feat: add unit tests for the event listener service and block keeper
  • Loading branch information
0x6e616d authored Aug 16, 2024
2 parents e87ec07 + ab18331 commit ad3cf24
Show file tree
Hide file tree
Showing 11 changed files with 371 additions and 49 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ on:
- "*"

env:
SERVICE: node-proxy
SERVICE: thanos-event-listener

jobs:
prepare:
Expand Down
16 changes: 16 additions & 0 deletions cmd/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (

const (
NetworkFlagName = "network"
L1HttpRpcUrlFlagName = "l1-http-rpc-url"
L1WsRpcUrlFlagName = "l1-ws-rpc"
L2WsRpcUrlFlagName = "l2-ws-rpc"
L2HttpRpcUrlFlagName = "l2-http-rpc"
L1StandardBridgeFlagName = "l1-standard-bridge-address"
L2StandardBridgeFlagName = "l2-standard-bridge-address"
L1UsdcBridgeFlagName = "l1-usdc-bridge-address"
Expand All @@ -27,6 +29,12 @@ var (
Usage: "Network name",
EnvVars: []string{"NETWORK"},
}
L1HttpRpcFlag = &cli.StringFlag{
Name: L1HttpRpcUrlFlagName,
Usage: "L1 HTTP RPC url",
Value: "http://localhost:8545",
EnvVars: []string{"L1_HTTP_RPC"},
}
L1WsRpcFlag = &cli.StringFlag{
Name: L1WsRpcUrlFlagName,
Usage: "L1 RPC url",
Expand All @@ -39,6 +47,12 @@ var (
Value: "ws://localhost:9546",
EnvVars: []string{"L2_WS_RPC"},
}
L2HttpRpcFlag = &cli.StringFlag{
Name: L2HttpRpcUrlFlagName,
Usage: "L2 HTTP RPC url",
Value: "http://localhost:9545",
EnvVars: []string{"L2_HTTP_RPC"},
}
L1StandardBridgeFlag = &cli.StringFlag{
Name: L1StandardBridgeFlagName,
Usage: "L1StandardBridge address",
Expand Down Expand Up @@ -97,7 +111,9 @@ func Flags() []cli.Flag {
return []cli.Flag{
NetworkFlag,
L1WsRpcFlag,
L1HttpRpcFlag,
L2WsRpcFlag,
L2HttpRpcFlag,
L1StandardBridgeFlag,
L2StandardBridgeFlag,
L1UsdcBridgeFlag,
Expand Down
2 changes: 2 additions & 0 deletions cmd/app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ func startListener(ctx *cli.Context) error {
config := &thanosnotif.Config{
Network: ctx.String(flags.NetworkFlagName),
L1WsRpc: ctx.String(flags.L1WsRpcUrlFlagName),
L1HttpRpc: ctx.String(flags.L1HttpRpcUrlFlagName),
L2WsRpc: ctx.String(flags.L2WsRpcUrlFlagName),
L2HttpRpc: ctx.String(flags.L2HttpRpcUrlFlagName),
L1StandardBridge: ctx.String(flags.L1StandardBridgeFlagName),
L2StandardBridge: ctx.String(flags.L2StandardBridgeFlagName),
L1UsdcBridge: ctx.String(flags.L1UsdcBridgeFlagName),
Expand Down
4 changes: 2 additions & 2 deletions internal/app/thanos-notif/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ func New(ctx context.Context, cfg *Config) (*App, error) {
return nil, err
}

l1Client, err := bcclient.New(ctx, cfg.L1WsRpc)
l1Client, err := bcclient.New(ctx, cfg.L1WsRpc, cfg.L1HttpRpc)
if err != nil {
log.GetLogger().Errorw("Failed to create L1 client", "error", err)
return nil, err
}

l2Client, err := bcclient.New(ctx, cfg.L2WsRpc)
l2Client, err := bcclient.New(ctx, cfg.L2WsRpc, cfg.L2HttpRpc)
if err != nil {
log.GetLogger().Errorw("Failed to create L2 client", "error", err)
return nil, err
Expand Down
14 changes: 12 additions & 2 deletions internal/app/thanos-notif/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
type Config struct {
Network string

L1WsRpc string
L1HttpRpc string
L1WsRpc string

L2WsRpc string
L2HttpRpc string
L2WsRpc string

L1StandardBridge string
L2StandardBridge string
Expand All @@ -35,10 +37,18 @@ func (c *Config) Validate() error {
return errors.New("l1 ws rpc address is required")
}

if c.L1HttpRpc == "" {
return errors.New("l1 http rpc address is required")
}

if c.L2WsRpc == "" {
return errors.New("l2 ws rpc address is required")
}

if c.L2HttpRpc == "" {
return errors.New("l2 http rpc address is required")
}

if c.L1StandardBridge == "" {
return errors.New("l1 standard bridge is required")
}
Expand Down
55 changes: 37 additions & 18 deletions internal/pkg/bcclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,23 @@ import (

type Client struct {
defaultClient *ethclient.Client
wsClient *ethclient.Client
chainID *big.Int
}

func New(ctx context.Context, rpcURL string) (*Client, error) {
func New(ctx context.Context, wsURL, rpcURL string) (*Client, error) {
httpClient := &http.Client{
Timeout: 3 * time.Second,
Timeout: 10 * time.Second,
}
rpcClient, err := rpc.DialOptions(ctx, rpcURL, rpc.WithHTTPClient(httpClient))
ethClient, err := initEthClient(ctx, rpcURL, httpClient)
if err != nil {
return nil, err
}

ethClient := ethclient.NewClient(rpcClient)
wsClient, err := initEthClient(ctx, wsURL, httpClient)
if err != nil {
return nil, err
}

chainID, err := ethClient.ChainID(ctx)
if err != nil {
Expand All @@ -41,16 +45,27 @@ func New(ctx context.Context, rpcURL string) (*Client, error) {

return &Client{
defaultClient: ethClient,
wsClient: wsClient,
chainID: chainID,
}, nil
}

func initEthClient(ctx context.Context, url string, httpClient *http.Client) (*ethclient.Client, error) {
rpcClient, err := rpc.DialOptions(ctx, url, rpc.WithHTTPClient(httpClient))
if err != nil {
return nil, err
}

ethClient := ethclient.NewClient(rpcClient)
return ethClient, nil
}

func (c *Client) GetClient() *ethclient.Client {
return c.defaultClient
}

func (c *Client) SubscribeNewHead(ctx context.Context, newHeadCh chan<- *ethereumTypes.Header) (ethereum.Subscription, error) {
return c.defaultClient.SubscribeNewHead(ctx, newHeadCh)
return c.wsClient.SubscribeNewHead(ctx, newHeadCh)
}

func (c *Client) BlockNumber(ctx context.Context) (uint64, error) {
Expand All @@ -71,21 +86,25 @@ func (c *Client) HeaderAtBlockNumber(ctx context.Context, blockNo uint64) (*ethe
}

func (c *Client) GetLogs(ctx context.Context, blockHash common.Hash) ([]ethereumTypes.Log, error) {
timeOutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

query := ethereum.FilterQuery{
BlockHash: &blockHash,
}

// Get the logs
logs, err := c.defaultClient.FilterLogs(timeOutCtx, query)
if err != nil {
log.GetLogger().Errorw("Failed to retrieve logs", "blockHash", blockHash.Hex(), "err", err)
return nil, err
var err error
var logs []ethereumTypes.Log
for i := 0; i < 3; i++ {
query := ethereum.FilterQuery{
BlockHash: &blockHash,
}

// Get the logs
logs, err = c.defaultClient.FilterLogs(ctx, query)
if err != nil {
log.GetLogger().Errorw("Failed to retrieve logs", "err", err)
time.Sleep(5 * time.Second)
continue
}

return logs, nil
}

return logs, nil
return nil, err
}

func (c *Client) HeaderAtBlockHash(ctx context.Context, blockHash common.Hash) (*ethereumTypes.Header, error) {
Expand Down
61 changes: 61 additions & 0 deletions internal/pkg/erc20/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package erc20

import (
"context"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/bcclient"
"github.com/tokamak-network/tokamak-thanos-event-listener/internal/pkg/types"
)

var (
wsUrl = "ws://sepolia.rpc.tokamak.network:8546"
httpUrl = "https://sepolia.rpc.tokamak.network"
)

func Test_FetchTokenInfo(t *testing.T) {
ctx := context.Background()

type testCases []struct {
Expected types.Token
ContractAddress string
}

bcClient, err := bcclient.New(ctx, wsUrl, httpUrl)
require.NoError(t, err)

var tests = testCases{
{
Expected: types.Token{
Symbol: "TON",
Decimals: 18,
Address: strings.ToLower("0xa30fe40285B8f5c0457DbC3B7C8A280373c40044"),
},
ContractAddress: strings.ToLower("0xa30fe40285B8f5c0457DbC3B7C8A280373c40044"),
},
{
Expected: types.Token{
Symbol: "TOS",
Decimals: 18,
Address: strings.ToLower("0xFF3Ef745D9878AfE5934Ff0b130868AFDDbc58e8"),
},
ContractAddress: strings.ToLower("0xFF3Ef745D9878AfE5934Ff0b130868AFDDbc58e8"),
},
}
t.Parallel()
for _, test := range tests {
t.Run(test.ContractAddress, func(t *testing.T) {
tokenInfo, err := FetchTokenInfo(bcClient, test.ContractAddress)
require.NoError(t, err)

assert.NotEmpty(t, tokenInfo)
assert.Equal(t, test.Expected.Symbol, tokenInfo.Symbol)
assert.Equal(t, test.Expected.Decimals, tokenInfo.Decimals)
assert.Equal(t, test.ContractAddress, tokenInfo.Address)
})
}

}
55 changes: 29 additions & 26 deletions internal/pkg/listener/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/gob"
"fmt"
"math"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -139,9 +140,10 @@ func (s *EventService) Start(ctx context.Context) error {
return err
}

s.sub = event.ResubscribeErr(10, func(ctx context.Context, err error) (event.Subscription, error) {
s.sub = event.ResubscribeErr(5*time.Second, func(ctx context.Context, err error) (event.Subscription, error) {
if err != nil {
s.l.Errorw("Failed to re-subscribe the event", "err", err)
time.Sleep(1 * time.Second)
}

return s.subscribeNewHead(ctx)
Expand Down Expand Up @@ -315,7 +317,7 @@ func (s *EventService) syncOldBlocks(ctx context.Context, headCh chan *types.New

blocksNeedToConsume := onchainBlockNo - consumedBlockNo

totalBatches := int(math.Ceil(float64(blocksNeedToConsume) / float64(MaxBatchBlocksSize)))
totalBatches := calculateBatchBlocks(int(blocksNeedToConsume))

s.l.Infow("Total batches", "total", totalBatches)
skip := consumedBlockNo + 1
Expand Down Expand Up @@ -356,35 +358,32 @@ func (s *EventService) handleReorgBlocks(ctx context.Context, newHeader *ethereu
return nil, fmt.Errorf("reorged block numbers don't match")
}

var g errgroup.Group
reorgedBlocks := make([]*types.NewBlock, 0)
totalBatches := calculateBatchBlocks(len(newBlocks))

reorgedBlocks := make([]*types.NewBlock, len(newBlocks))
for i, newBlock := range newBlocks {
s.l.Infow("Detect reorg block", "block", newBlock.Number.Uint64())
i := i
newBlock := newBlock
s.l.Infow("Total batches", "total", totalBatches)
skip := newBlocks[0].Number.Uint64()
to := newBlocks[len(newBlocks)-1].Number.Uint64()
idx := uint64(0)
for i := 0; i < totalBatches; i++ {
fromBlock := skip
toBlock := skip + MaxBatchBlocksSize - 1

g.Go(func() error {
blockHash := newBlock.Hash()
reorgedLogs, errLogs := s.bcClient.GetLogs(ctx, blockHash)
if errLogs != nil {
s.l.Errorw("Failed to get logs", "err", errLogs)
return errLogs
}
if toBlock > to {
toBlock = to
}

reorgedBlocks[i] = &types.NewBlock{
Header: newBlock,
Logs: reorgedLogs,
ReorgedBlockHash: reorgedBlockHashes[i],
}
blocks, err := s.bcClient.GetBlocks(ctx, true, fromBlock, toBlock)
if err != nil {
return nil, err
}

return nil
})
}
for j, block := range blocks {
block.ReorgedBlockHash = reorgedBlockHashes[int(idx)+j]
}

err = g.Wait()
if err != nil {
return nil, err
idx += toBlock - fromBlock + 1
reorgedBlocks = append(reorgedBlocks, blocks...)
}

return reorgedBlocks, nil
Expand All @@ -394,3 +393,7 @@ func serializeEventRequestWithAddressAndABI(address common.Address, hashedABI co
result := fmt.Sprintf("%s:%s", address.String(), hashedABI)
return result
}

func calculateBatchBlocks(blocks int) int {
return int(math.Ceil(float64(blocks) / float64(MaxBatchBlocksSize)))
}
Loading

0 comments on commit ad3cf24

Please sign in to comment.