From 966dd7f6e31ebf26fda8e2a3c41f6fbf3ee244f4 Mon Sep 17 00:00:00 2001 From: mmsqe Date: Mon, 24 Oct 2022 09:28:34 +0800 Subject: [PATCH] chain ws to synchronizer and streamer subscribe latest blk via tm, sync historical via http --- app/app.go | 91 ++++++++++++++++++++------ client/ws.go | 96 ++++++++++++++++++++++++++++ file/watcher.go | 26 ++++++-- go.mod | 2 +- integration_tests/test_query_only.py | 11 ++-- 5 files changed, 196 insertions(+), 30 deletions(-) create mode 100644 client/ws.go diff --git a/app/app.go b/app/app.go index e3f6f954f9..072fab19b7 100644 --- a/app/app.go +++ b/app/app.go @@ -14,6 +14,7 @@ import ( "github.com/crypto-org-chain/cronos/x/cronos/middleware" "github.com/cosmos/cosmos-sdk/client" + "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/cosmos-sdk/server" "github.com/gorilla/mux" @@ -134,6 +135,7 @@ import ( cronoskeeper "github.com/crypto-org-chain/cronos/x/cronos/keeper" evmhandlers "github.com/crypto-org-chain/cronos/x/cronos/keeper/evmhandlers" cronostypes "github.com/crypto-org-chain/cronos/x/cronos/types" + tmtypes "github.com/tendermint/tendermint/types" // unnamed import of statik for swagger UI support _ "github.com/crypto-org-chain/cronos/client/docs/statik" @@ -413,12 +415,38 @@ func New( startBlockNum = 0 } nextBlockNum := int(startBlockNum) + 1 + // TODO: maxBlockNum init from primary node + maxBlockNum := 0 + interval := time.Second + directory := filepath.Join(rootDir, "data", "file_streamer") + // streamer write the file blk by blk with concurrency 1 + streamer := cronosfile.NewBlockFileWatcher(1, maxBlockNum, func(blockNum int) string { + return cronosfile.GetLocalDataFileName(directory, blockNum) + }, true) + streamer.Start(nextBlockNum, interval) + go func() { + chData, chErr := streamer.SubscribeData(), streamer.SubscribeError() + for { + select { + case data := <-chData: + pairs, err := cronosfile.DecodeData(data.Data) + fmt.Printf("mm-pairs: %+v, %+v\n", len(pairs), err) + if err == nil { + versionDB.PutAtVersion(int64(data.BlockNum), pairs) + } + case err := <-chErr: + // fail read + fmt.Println("mm-fail-read-panic") + panic(err) + } + } + }() + isLocal := cast.ToBool(appOpts.Get(cronosappclient.FlagIsLocal)) remoteUrl := cast.ToString(appOpts.Get(cronosappclient.FlagRemoteUrl)) concurrency := cast.ToInt(appOpts.Get(cronosappclient.FlagConcurrency)) - interval := time.Second - synchronizer := cronosfile.NewBlockFileWatcher(concurrency, func(blockNum int) string { + synchronizer := cronosfile.NewBlockFileWatcher(concurrency, maxBlockNum, func(blockNum int) string { return fmt.Sprintf("%s/%s", remoteUrl, cronosfile.DataFileName(blockNum)) }, isLocal) synchronizer.Start(nextBlockNum, interval) @@ -431,41 +459,62 @@ func New( select { case data := <-chData: file := cronosfile.GetLocalDataFileName(directory, data.BlockNum) - fmt.Printf("mm-file: %+v\n", file) + fmt.Printf("mm-data.BlockNum: %+v\n", data.BlockNum) if err := os.WriteFile(file, data.Data, 0644); err != nil { + fmt.Println("mm-WriteFile-panic") panic(err) } retry = 0 - // fmt.Printf("mm-pairs: %+v\n", len(pairs)) - // versionDB.PutAtVersion(int64(data.BlockNum), pairs) + fmt.Println("mm-reset-retry") + if data.BlockNum > maxBlockNum { + streamer.SetMaxBlockNum(data.BlockNum) + } case err := <-chErr: retry++ + fmt.Println("mm-retry", retry) if retry == maxRetry { // data corrupt + fmt.Println("mm-data-corrupt-panic") panic(err) } } } }() - // streamer write the file blk by blk with concurrency 1 - streamer := cronosfile.NewBlockFileWatcher(1, func(blockNum int) string { - return cronosfile.GetLocalDataFileName(directory, blockNum) - }, true) - streamer.Start(nextBlockNum, interval) go func() { - chData, chErr := streamer.SubscribeData(), streamer.SubscribeError() - for { - select { - case data := <-chData: - pairs, err := cronosfile.DecodeData(data.Data) - fmt.Printf("mm-pairs: %+v, %+v\n", len(pairs), err) - if err == nil { - versionDB.PutAtVersion(int64(data.BlockNum), pairs) + maxRetry := 50 + for i := 0; i < maxRetry; i++ { + if i > 0 { + time.Sleep(time.Second) + } + // TODO: config remote tm + wsClient := cronosappclient.NewWebsocketClient("ws://localhost:26767/websocket") + chResult, err := wsClient.Subscribe() + if err != nil { + fmt.Printf("mm-subscribed[%+v]: %+v\n", i, err) + continue + } + fmt.Println("subscribing") + err = wsClient.Send("subscribe", []string{ + "tm.event='NewBlockHeader'", + }) + if err != nil { + fmt.Printf("mm-subscribed: %+v\n", err) + continue + } + i = 0 + fmt.Println("subscribed ws") + for res := range chResult { + if res == nil { + continue } - case err := <-chErr: - // fail read - panic(err) + data, ok := res.Data.(tmtypes.EventDataNewBlockHeader) + if !ok { + continue + } + blockNum := int(data.Header.Height) + fmt.Printf("mm-set-max-blk: %+v\n", blockNum) + synchronizer.SetMaxBlockNum(blockNum) } } }() diff --git a/client/ws.go b/client/ws.go new file mode 100644 index 0000000000..edf098f2aa --- /dev/null +++ b/client/ws.go @@ -0,0 +1,96 @@ +package client + +import ( + "bytes" + "context" + "fmt" + "time" + + tmjson "github.com/tendermint/tendermint/libs/json" + coretypes "github.com/tendermint/tendermint/rpc/core/types" + types "github.com/tendermint/tendermint/rpc/jsonrpc/types" + "nhooyr.io/websocket" + "nhooyr.io/websocket/wsjson" +) + +type WebsocketClient struct { + url string + wsconn *websocket.Conn +} + +func NewWebsocketClient(url string) *WebsocketClient { + return &WebsocketClient{url: url} +} + +func (c *WebsocketClient) Subscribe() (<-chan *coretypes.ResultEvent, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + conn, _, err := websocket.Dial(ctx, c.url, nil) + if err != nil { + return nil, err + } + + c.wsconn = conn + conn.SetReadLimit(10240000) + + chResult := make(chan *coretypes.ResultEvent) + go func() { + defer close(chResult) + for { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + _, r, err := c.wsconn.Reader(ctx) + if err != nil { + cancel() + continue + } + buf := new(bytes.Buffer) + _, err = buf.ReadFrom(r) + if err != nil { + continue + } + cancel() + bz := buf.Bytes() + res := new(types.RPCResponse) + err = tmjson.Unmarshal(bz, res) + if err != nil { + // fmt.Printf("mm-unmarshal-err: %+v\n", err) + // continue + fmt.Printf("mm-read-res-err: %+v\n", err) + chResult <- nil + break + } + ev := new(coretypes.ResultEvent) + if err := tmjson.Unmarshal(res.Result, &ev); err != nil { + fmt.Printf("mm-read-ev-err: %+v\n", err) + chResult <- nil + break + } + chResult <- ev + time.Sleep(time.Second) + } + }() + return chResult, nil +} + +func (c *WebsocketClient) Send( + method string, + params []string, +) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + return wsjson.Write(ctx, c.wsconn, map[string]interface{}{ + "jsonrpc": "2.0", + "method": method, + "params": params, + "id": 1, + }) +} + +func (c *WebsocketClient) Close() { + if c.wsconn != nil { + c.wsconn.Close(websocket.StatusNormalClosure, "") + c.wsconn = nil + } +} diff --git a/file/watcher.go b/file/watcher.go index 8bbf8c8e07..3d53fbf68b 100644 --- a/file/watcher.go +++ b/file/watcher.go @@ -9,6 +9,7 @@ import ( "net/http" "os" "sync" + "sync/atomic" "time" ) @@ -60,6 +61,7 @@ type BlockData struct { type BlockFileWatcher struct { concurrency int + maxBlockNum int64 getFilePath func(blockNum int) string downloader fileDownloader chData chan *BlockData @@ -70,11 +72,13 @@ type BlockFileWatcher struct { func NewBlockFileWatcher( concurrency int, + maxBlockNum int, getFilePath func(blockNum int) string, isLocal bool, ) *BlockFileWatcher { w := &BlockFileWatcher{ concurrency: concurrency, + maxBlockNum: int64(maxBlockNum), getFilePath: getFilePath, chData: make(chan *BlockData), chError: make(chan error), @@ -104,11 +108,16 @@ func (w *BlockFileWatcher) SubscribeError() <-chan error { return w.chError } +func (w *BlockFileWatcher) SetMaxBlockNum(num int) { + atomic.StoreInt64(&w.maxBlockNum, int64(num)) +} + func (w *BlockFileWatcher) fetch(blockNum int) error { path := w.getFilePath(blockNum) f, err := os.Open(path) if err == nil { defer f.Close() + // valid 1st 8 bytes for downloaded file var bytes [8]byte if _, err = io.ReadFull(f, bytes[:]); err == nil { size := binary.BigEndian.Uint64(bytes[:]) @@ -119,7 +128,7 @@ func (w *BlockFileWatcher) fetch(blockNum int) error { f.Close() } - // TBC: skip if exist path to avoid dup download + // download if file not exist data, err := w.downloader.GetData(path) fmt.Printf("mm-fetch: %+v, %+v, %+v\n", blockNum, len(data), err) if err != nil { @@ -158,8 +167,18 @@ func (w *BlockFileWatcher) Start( default: wg := new(sync.WaitGroup) - resultErrs := make([]error, w.concurrency) - for i := 0; i < w.concurrency; i++ { + currentBlockNum := blockNum + maxBlockNum := int(atomic.LoadInt64(&w.maxBlockNum)) + concurrency := w.concurrency + if diff := maxBlockNum - currentBlockNum; diff < concurrency { + if diff <= 0 { + time.Sleep(interval) + break + } + concurrency = diff + } + resultErrs := make([]error, concurrency) + for i := 0; i < concurrency; i++ { nextBlockNum := blockNum + i fmt.Println("mm-start: ", nextBlockNum) if !finishedBlockNums[nextBlockNum] { @@ -173,7 +192,6 @@ func (w *BlockFileWatcher) Start( } wg.Wait() errReached := false - currentBlockNum := blockNum for i, err := range resultErrs { b := currentBlockNum + i if err != nil { diff --git a/go.mod b/go.mod index 26fa703003..a38e2e7b1d 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( google.golang.org/grpc v1.50.1 google.golang.org/protobuf v1.28.1 gopkg.in/yaml.v2 v2.4.0 + nhooyr.io/websocket v1.8.6 ) require ( @@ -191,7 +192,6 @@ require ( gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - nhooyr.io/websocket v1.8.6 // indirect sigs.k8s.io/yaml v1.3.0 // indirect ) diff --git a/integration_tests/test_query_only.py b/integration_tests/test_query_only.py index deaa4c018d..2c0d675bb3 100644 --- a/integration_tests/test_query_only.py +++ b/integration_tests/test_query_only.py @@ -73,6 +73,7 @@ def network(tmp_path_factory): procs.append(exec(base / "replica.jsonnet", path1, base_port1)) try: wait_for_port(ports.evmrpc_port(base_port0)) + wait_for_port(ports.evmrpc_ws_port(base_port0)) wait_for_port(ports.grpc_port(base_port1)) yield Network(Cronos(path0 / chain_id), Cronos(path1 / chain_id)) finally: @@ -87,10 +88,12 @@ def grpc_call(p, address): url = f"http://127.0.0.1:{p}/cosmos/bank/v1beta1/balances/{address}" response = requests.get(url) if not response.ok: - raise Exception( - f"response code: {response.status_code}, " - f"{response.reason}, {response.json()}" - ) + # retry until file get synced + return -1 + # raise Exception( + # f"response code: {response.status_code}, " + # f"{response.reason}, {response.json()}" + # ) result = response.json() if result.get("code"): raise Exception(result["raw_log"])