From 60e431e4eef069586be8cf2d1467aa993fce6120 Mon Sep 17 00:00:00 2001 From: yihuang Date: Thu, 22 Jun 2023 17:46:28 +0800 Subject: [PATCH] feat: make handshake cancelable (backport #857) (#1012) it'll make the handshake work with graceful shutdown(see: https://github.com/cosmos/cosmos-sdk/issues/16202) handshake could be a long running process if there are many local blocks to replay, for example we use it to do profiling. Hope we can backport this to 0.34.x. --- - [ ] Tests written/updated - [ ] Changelog entry added in `.changelog` (we use [unclog](https://github.com/informalsystems/unclog) to manage our changelog) - [ ] Updated relevant documentation (`docs/` or `spec/`) and code comments --- .../857-make-handshake-cancelable.md | 1 + consensus/replay.go | 30 +++++++++++++++++-- node/node.go | 22 ++++++++++++-- 3 files changed, 48 insertions(+), 5 deletions(-) create mode 100644 .changelog/unreleased/improvements/857-make-handshake-cancelable.md diff --git a/.changelog/unreleased/improvements/857-make-handshake-cancelable.md b/.changelog/unreleased/improvements/857-make-handshake-cancelable.md new file mode 100644 index 0000000000..16b447f6d2 --- /dev/null +++ b/.changelog/unreleased/improvements/857-make-handshake-cancelable.md @@ -0,0 +1 @@ +- `[node]` Make handshake cancelable ([cometbft/cometbft\#857](https://github.com/cometbft/cometbft/pull/857)) diff --git a/consensus/replay.go b/consensus/replay.go index b5d50baebb..7c6d55e1b8 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -2,6 +2,7 @@ package consensus import ( "bytes" + "context" "fmt" "hash/crc32" "io" @@ -239,6 +240,11 @@ func (h *Handshaker) NBlocks() int { // TODO: retry the handshake/replay if it fails ? func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { + return h.HandshakeWithContext(context.TODO(), proxyApp) +} + +// HandshakeWithContext is cancellable version of Handshake +func (h *Handshaker) HandshakeWithContext(ctx context.Context, proxyApp proxy.AppConns) error { // Handshake is done via ABCI Info on the query conn. res, err := proxyApp.Query().InfoSync(proxy.RequestInfo) @@ -265,7 +271,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { } // Replay blocks up to the latest in the blockstore. - _, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp) + appHash, err = h.ReplayBlocksWithContext(ctx, h.initialState, appHash, blockHeight, proxyApp) if err != nil { return fmt.Errorf("error on replay: %v", err) } @@ -286,6 +292,17 @@ func (h *Handshaker) ReplayBlocks( appHash []byte, appBlockHeight int64, proxyApp proxy.AppConns, +) ([]byte, error) { + return h.ReplayBlocksWithContext(context.TODO(), state, appHash, appBlockHeight, proxyApp) +} + +// ReplayBlocksWithContext is cancellable version of ReplayBlocks. +func (h *Handshaker) ReplayBlocksWithContext( + ctx context.Context, + state sm.State, + appHash []byte, + appBlockHeight int64, + proxyApp proxy.AppConns, ) ([]byte, error) { storeBlockBase := h.store.Base() storeBlockHeight := h.store.Height() @@ -390,7 +407,7 @@ func (h *Handshaker) ReplayBlocks( // Either the app is asking for replay, or we're all synced up. if appBlockHeight < storeBlockHeight { // the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store) - return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false) + return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, false) } else if appBlockHeight == storeBlockHeight { // We're good! @@ -405,7 +422,7 @@ func (h *Handshaker) ReplayBlocks( case appBlockHeight < stateBlockHeight: // the app is further behind than it should be, so replay blocks // but leave the last block to go through the WAL - return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true) + return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, true) case appBlockHeight == stateBlockHeight: // We haven't run Commit (both the state and app are one block behind), @@ -435,6 +452,7 @@ func (h *Handshaker) ReplayBlocks( } func (h *Handshaker) replayBlocks( + ctx context.Context, state sm.State, proxyApp proxy.AppConns, appBlockHeight, @@ -461,6 +479,12 @@ func (h *Handshaker) replayBlocks( firstBlock = state.InitialHeight } for i := firstBlock; i <= finalBlock; i++ { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + h.logger.Info("Applying block", "height", i) block := h.store.LoadBlock(i) // Extra check to ensure the app was not changed in a way it shouldn't have. diff --git a/node/node.go b/node/node.go index 07212080a9..1de80c56e1 100644 --- a/node/node.go +++ b/node/node.go @@ -315,6 +315,7 @@ func createAndStartIndexerService( } func doHandshake( + ctx context.Context, stateStore sm.Store, state sm.State, blockStore sm.BlockStore, @@ -326,7 +327,7 @@ func doHandshake( handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc) handshaker.SetLogger(consensusLogger) handshaker.SetEventBus(eventBus) - if err := handshaker.Handshake(proxyApp); err != nil { + if err := handshaker.HandshakeWithContext(ctx, proxyApp); err != nil { return fmt.Errorf("error during handshake: %v", err) } return nil @@ -714,6 +715,23 @@ func NewNode(config *cfg.Config, metricsProvider MetricsProvider, logger log.Logger, options ...Option, +) (*Node, error) { + return NewNodeWithContext(context.TODO(), config, privValidator, + nodeKey, clientCreator, genesisDocProvider, dbProvider, + metricsProvider, logger, options...) +} + +// NewNodeWithContext is cancellable version of NewNode. +func NewNodeWithContext(ctx context.Context, + config *cfg.Config, + privValidator types.PrivValidator, + nodeKey *p2p.NodeKey, + clientCreator proxy.ClientCreator, + genesisDocProvider GenesisDocProvider, + dbProvider DBProvider, + metricsProvider MetricsProvider, + logger log.Logger, + options ...Option, ) (*Node, error) { blockStore, stateDB, err := initDBs(config, dbProvider) if err != nil { @@ -776,7 +794,7 @@ func NewNode(config *cfg.Config, // and replays any blocks as necessary to sync CometBFT with the app. consensusLogger := logger.With("module", "consensus") if !stateSync { - if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil { + if err := doHandshake(ctx, stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil { return nil, err }