From 575a00396f3224fcf0dfd919d0af715e52a62c11 Mon Sep 17 00:00:00 2001 From: mmsqe Date: Mon, 5 Dec 2022 09:52:36 +0800 Subject: [PATCH] fail early if incomplete allow retry if incomplete --- client/decoder.go | 13 +++++++++++-- client/file/sync.go | 24 +++++++++++++----------- client/file/watcher.go | 24 ++++++++++++++++++------ 3 files changed, 42 insertions(+), 19 deletions(-) diff --git a/client/decoder.go b/client/decoder.go index f850facdc4..884342fa7c 100644 --- a/client/decoder.go +++ b/client/decoder.go @@ -1,13 +1,22 @@ package client import ( + "fmt" + "github.com/cosmos/cosmos-sdk/store/types" + sdk "github.com/cosmos/cosmos-sdk/types" "github.com/gogo/protobuf/proto" ) func DecodeData(data []byte) (pairs []types.StoreKVPair, err error) { - offset := 8 - for offset < len(data) { + const prefixLen = 8 + offset := prefixLen + dataSize := sdk.BigEndianToUint64(data[:offset]) + size := len(data) + if int(dataSize)+prefixLen != size { + return nil, fmt.Errorf("incomplete file: %v vs %v", dataSize, size) + } + for offset < size { size, n := proto.DecodeVarint(data[offset:]) offset += n pair := new(types.StoreKVPair) diff --git a/client/file/sync.go b/client/file/sync.go index a5dd0acc5c..90845c57ba 100644 --- a/client/file/sync.go +++ b/client/file/sync.go @@ -96,12 +96,13 @@ func Sync(versionDB *tmdb.Store, remoteGrpcUrl, remoteUrl, remoteWsUrl, rootDir case data := <-chData: pairs, err := client.DecodeData(data.Data) fmt.Printf("mm-pairs: %+v, %+v\n", len(pairs), err) - if err == nil { - if err = versionDB.PutAtVersion(int64(data.BlockNum), pairs); err != nil { - fmt.Println("mm-put-at-version-panic") - panic(err) - } + if err != nil { + fmt.Println("invalid decode") + } else if err = versionDB.PutAtVersion(int64(data.BlockNum), pairs); err != nil { + fmt.Println("mm-put-at-version-panic") + panic(err) } + data.ChResult <- err case err := <-chErr: // fail read fmt.Println("mm-fail-read-panic") @@ -150,12 +151,14 @@ func Sync(versionDB *tmdb.Store, remoteGrpcUrl, remoteUrl, remoteWsUrl, rootDir if err := os.WriteFile(file, data.Data, 0600); err != nil { fmt.Println("mm-WriteFile-panic") panic(err) + } else { + retry = 0 + fmt.Println("mm-reset-retry") + if data.BlockNum > maxBlockNum { + streamer.SetMaxBlockNum(data.BlockNum) + } } - retry = 0 - fmt.Println("mm-reset-retry") - if data.BlockNum > maxBlockNum { - streamer.SetMaxBlockNum(data.BlockNum) - } + data.ChResult <- err case err := <-chErr: retry++ fmt.Println("mm-retry", retry) @@ -200,7 +203,6 @@ func Sync(versionDB *tmdb.Store, remoteGrpcUrl, remoteUrl, remoteWsUrl, rootDir blockNum := int(data.Header.Height) fmt.Printf("mm-set-max-blk: %+v\n", blockNum) synchronizer.SetMaxBlockNum(blockNum) - streamer.SetMaxBlockNum(blockNum) } } panic(fmt.Sprintf("max retries %d reached", defaultMaxRetry)) diff --git a/client/file/watcher.go b/client/file/watcher.go index 00526a779c..c1468bf5d1 100644 --- a/client/file/watcher.go +++ b/client/file/watcher.go @@ -8,7 +8,6 @@ import ( "os" "path/filepath" "sync" - "sync/atomic" "time" ) @@ -56,11 +55,12 @@ func (d *httpFileDownloader) GetData(path string) ([]byte, error) { type BlockData struct { BlockNum int Data []byte + ChResult chan<- error } type BlockFileWatcher struct { concurrency int - maxBlockNum int64 + maxBlockNum int getFilePath func(blockNum int) string onBeforeFetch func(blockNum int) bool downloader fileDownloader @@ -68,6 +68,7 @@ type BlockFileWatcher struct { chError chan error chDone chan bool startLock *sync.Mutex + maxBlockLock *sync.RWMutex } func NewBlockFileWatcher( @@ -79,12 +80,13 @@ func NewBlockFileWatcher( ) *BlockFileWatcher { w := &BlockFileWatcher{ concurrency: concurrency, - maxBlockNum: int64(maxBlockNum), + maxBlockNum: maxBlockNum, getFilePath: getFilePath, onBeforeFetch: onBeforeFetch, chData: make(chan *BlockData), chError: make(chan error), startLock: new(sync.Mutex), + maxBlockLock: new(sync.RWMutex), } if isLocal { w.downloader = new(localFileDownloader) @@ -111,7 +113,12 @@ func (w *BlockFileWatcher) SubscribeError() <-chan error { } func (w *BlockFileWatcher) SetMaxBlockNum(num int) { - atomic.StoreInt64(&w.maxBlockNum, int64(num)) + w.maxBlockLock.Lock() + defer w.maxBlockLock.Unlock() + // avoid dup job when set max to smaller one while locking + if num > w.maxBlockNum { + w.maxBlockNum = num + } } func (w *BlockFileWatcher) fetch(blockNum int) error { @@ -131,11 +138,14 @@ func (w *BlockFileWatcher) fetch(blockNum int) error { } return err } + + chResult := make(chan error) w.chData <- &BlockData{ BlockNum: blockNum, Data: data, + ChResult: chResult, } - return nil + return <-chResult } func (w *BlockFileWatcher) Start( @@ -158,7 +168,9 @@ func (w *BlockFileWatcher) Start( default: wg := new(sync.WaitGroup) currentBlockNum := blockNum - maxBlockNum := int(atomic.LoadInt64(&w.maxBlockNum)) + w.maxBlockLock.RLock() + maxBlockNum := w.maxBlockNum + w.maxBlockLock.RUnlock() concurrency := w.concurrency if diff := maxBlockNum - currentBlockNum; diff < concurrency { if diff <= 0 {