diff --git a/abci/example/dummyapp/main.go b/abci/example/dummyapp/main.go index c62e884d37..a9eb04bbd1 100644 --- a/abci/example/dummyapp/main.go +++ b/abci/example/dummyapp/main.go @@ -10,7 +10,6 @@ import ( cmd "github.com/celestiaorg/celestia-core/cmd/tendermint/commands" "github.com/celestiaorg/celestia-core/cmd/tendermint/commands/debug" cfg "github.com/celestiaorg/celestia-core/config" - "github.com/celestiaorg/celestia-core/ipfs" "github.com/celestiaorg/celestia-core/libs/cli" "github.com/celestiaorg/celestia-core/libs/log" "github.com/celestiaorg/celestia-core/node" @@ -78,11 +77,7 @@ func main() { } // DummyNode implements NodeProvider. -func DummyNode(config *cfg.Config, provider ipfs.NodeProvider, logger log.Logger) (*node.Node, error) { - if err := ipfs.InitRepo(config.IPFS.Path(), logger); err != nil { - return nil, fmt.Errorf("failed to initialize IPFS repo at path %s: %v", config.IPFS.Path(), err) - } - +func DummyNode(config *cfg.Config, logger log.Logger) (*node.Node, error) { nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile()) if err != nil { return nil, fmt.Errorf("failed to load or gen node key %s: %w", config.NodeKeyFile(), err) @@ -103,7 +98,6 @@ func DummyNode(config *cfg.Config, provider ipfs.NodeProvider, logger log.Logger proxy.NewLocalClientCreator(app), node.DefaultGenesisDocProviderFunc(config), node.DefaultDBProvider, - provider, node.DefaultMetricsProvider(config.Instrumentation), logger, ) diff --git a/blockchain/v0/reactor.go b/blockchain/v0/reactor.go index f30a40a109..55bde3b247 100644 --- a/blockchain/v0/reactor.go +++ b/blockchain/v0/reactor.go @@ -1,7 +1,6 @@ package v0 import ( - "context" "fmt" "reflect" "time" @@ -179,10 +178,7 @@ func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { func (bcR *BlockchainReactor) respondToPeer(msg *bcproto.BlockRequest, src p2p.Peer) (queued bool) { - block, err := bcR.store.LoadBlock(context.TODO(), msg.Height) - if err != nil { - panic(err) - } + block := bcR.store.LoadBlock(msg.Height) if block != nil { bl, err := block.ToProto() if err != nil { @@ -422,14 +418,11 @@ FOR_LOOP: bcR.pool.PopRequest() // TODO: batch saves so we dont persist to disk every block - err := bcR.store.SaveBlock(context.TODO(), first, firstParts, second.LastCommit) - if err != nil { - // an error is only returned if something with the local IPFS blockstore is seriously wrong - panic(err) - } + bcR.store.SaveBlock(first, firstParts, second.LastCommit) // TODO: same thing for app - but we would need a way to get the hash // without persisting the state. + var err error state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first) if err != nil { // TODO This is bad, are we zombie? diff --git a/blockchain/v0/reactor_test.go b/blockchain/v0/reactor_test.go index cf774e95f0..cd9825fda1 100644 --- a/blockchain/v0/reactor_test.go +++ b/blockchain/v0/reactor_test.go @@ -1,7 +1,6 @@ package v0 import ( - "context" "crypto/sha256" "fmt" "os" @@ -69,9 +68,10 @@ func newBlockchainReactor( panic(fmt.Errorf("error start app: %w", err)) } + blockDB := memdb.NewDB() stateDB := memdb.NewDB() stateStore := sm.NewStore(stateDB) - blockStore := store.MockBlockStore(nil) + blockStore := store.NewBlockStore(blockDB) state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc) if err != nil { @@ -99,10 +99,7 @@ func newBlockchainReactor( lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil) if blockHeight > 1 { lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1) - lastBlock, err := blockStore.LoadBlock(context.TODO(), blockHeight-1) - if err != nil { - panic(err) - } + lastBlock := blockStore.LoadBlock(blockHeight - 1) vote, err := types.MakeVote( lastBlock.Header.Height, @@ -129,10 +126,7 @@ func newBlockchainReactor( panic(fmt.Errorf("error apply block: %w", err)) } - err := blockStore.SaveBlock(context.TODO(), thisBlock, thisParts, lastCommit) - if err != nil { - panic(err) - } + blockStore.SaveBlock(thisBlock, thisParts, lastCommit) } bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) @@ -189,10 +183,7 @@ func TestNoBlockResponse(t *testing.T) { assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height()) for _, tt := range tests { - block, err := reactorPairs[1].reactor.store.LoadBlock(context.TODO(), tt.height) - if err != nil { - panic(err) - } + block := reactorPairs[1].reactor.store.LoadBlock(tt.height) if tt.existent { assert.True(t, block != nil) } else { diff --git a/cmd/tendermint/commands/init.go b/cmd/tendermint/commands/init.go index 6f0ace24c3..aa35f1efac 100644 --- a/cmd/tendermint/commands/init.go +++ b/cmd/tendermint/commands/init.go @@ -6,7 +6,6 @@ import ( "github.com/spf13/cobra" cfg "github.com/celestiaorg/celestia-core/config" - "github.com/celestiaorg/celestia-core/ipfs" tmos "github.com/celestiaorg/celestia-core/libs/os" tmrand "github.com/celestiaorg/celestia-core/libs/rand" "github.com/celestiaorg/celestia-core/p2p" @@ -99,6 +98,5 @@ func initFilesWithConfig(config *cfg.Config) error { logger.Info("Generated genesis file", "path", genFile) } - // TODO(ismail): add counter part in ResetAllCmd - return ipfs.InitRepo(config.IPFS.Path(), logger) + return nil } diff --git a/cmd/tendermint/commands/run_node.go b/cmd/tendermint/commands/run_node.go index 02e4f757b2..361af54362 100644 --- a/cmd/tendermint/commands/run_node.go +++ b/cmd/tendermint/commands/run_node.go @@ -10,14 +10,12 @@ import ( "github.com/spf13/cobra" cfg "github.com/celestiaorg/celestia-core/config" - "github.com/celestiaorg/celestia-core/ipfs" tmos "github.com/celestiaorg/celestia-core/libs/os" nm "github.com/celestiaorg/celestia-core/node" ) var ( genesisHash []byte - initIPFS bool ) // AddNodeFlags exposes some common configuration options on the command-line @@ -91,22 +89,6 @@ func AddNodeFlags(cmd *cobra.Command) { config.DBPath, "database directory") - cmd.Flags().String( - "ipfs.repo-path", - config.IPFS.RepoPath, - "custom IPFS repository path. Defaults to `.{RootDir}/ipfs`", - ) - cmd.Flags().Bool( - "ipfs.serve-api", - config.IPFS.ServeAPI, - "set this to expose IPFS API(useful for debugging)", - ) - cmd.Flags().BoolVar( - &initIPFS, - "ipfs.init", - false, - "set this to initialize repository for embedded IPFS node. Flag is ignored if repo is already initialized", - ) } // NewRunNodeCmd returns the command that allows the CLI to start a node. @@ -123,7 +105,6 @@ func NewRunNodeCmd(nodeProvider nm.Provider) *cobra.Command { n, err := nodeProvider( config, - ipfs.Embedded(initIPFS, config.IPFS, logger), logger, ) if err != nil { diff --git a/config/config.go b/config/config.go index fbaf69c7fb..d10c23dabd 100644 --- a/config/config.go +++ b/config/config.go @@ -8,8 +8,6 @@ import ( "os" "path/filepath" "time" - - "github.com/celestiaorg/celestia-core/ipfs" ) const ( @@ -67,8 +65,6 @@ type Config struct { Consensus *ConsensusConfig `mapstructure:"consensus"` TxIndex *TxIndexConfig `mapstructure:"tx-index"` Instrumentation *InstrumentationConfig `mapstructure:"instrumentation"` - // Options for IPFS service - IPFS *ipfs.Config `mapstructure:"ipfs"` } // DefaultConfig returns a default configuration for a Tendermint node @@ -83,7 +79,6 @@ func DefaultConfig() *Config { Consensus: DefaultConsensusConfig(), TxIndex: DefaultTxIndexConfig(), Instrumentation: DefaultInstrumentationConfig(), - IPFS: ipfs.DefaultConfig(), } } @@ -99,7 +94,6 @@ func TestConfig() *Config { Consensus: TestConsensusConfig(), TxIndex: TestTxIndexConfig(), Instrumentation: TestInstrumentationConfig(), - IPFS: TetsIpfsConfig(), } } @@ -110,7 +104,6 @@ func (cfg *Config) SetRoot(root string) *Config { cfg.P2P.RootDir = root cfg.Mempool.RootDir = root cfg.Consensus.RootDir = root - cfg.IPFS.RootDir = root return cfg } @@ -847,7 +840,7 @@ func TestConsensusConfig() *ConsensusConfig { cfg.TimeoutProposeDelta = 20 * time.Millisecond cfg.TimeoutPrevote = 80 * time.Millisecond cfg.TimeoutPrevoteDelta = 20 * time.Millisecond - cfg.TimeoutPrecommit = 160 * time.Millisecond + cfg.TimeoutPrecommit = 80 * time.Millisecond cfg.TimeoutPrecommitDelta = 20 * time.Millisecond // NOTE: when modifying, make sure to update time_iota_ms (testGenesisFmt) in toml.go cfg.TimeoutCommit = 80 * time.Millisecond @@ -1009,10 +1002,6 @@ func DefaultInstrumentationConfig() *InstrumentationConfig { } } -func TetsIpfsConfig() *ipfs.Config { - return ipfs.DefaultConfig() -} - // TestInstrumentationConfig returns a default configuration for metrics // reporting. func TestInstrumentationConfig() *InstrumentationConfig { diff --git a/config/toml.go b/config/toml.go index 48ae268159..664c0182a3 100644 --- a/config/toml.go +++ b/config/toml.go @@ -430,15 +430,6 @@ max-open-connections = {{ .Instrumentation.MaxOpenConnections }} # Instrumentation namespace namespace = "{{ .Instrumentation.Namespace }}" - -####################################################### -### IPFS Configuration Options ### -####################################################### -[ipfs] - -# IPFS related configuration -repo-path = "{{ .IPFS.RepoPath}}" -serve-api = "{{ .IPFS.ServeAPI}}" ` /****** these are for test settings ***********/ diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 0934785e37..9d53eff4d3 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -9,16 +9,12 @@ import ( "testing" "time" - "github.com/ipfs/go-blockservice" - offline "github.com/ipfs/go-ipfs-exchange-offline" - "github.com/ipfs/go-merkledag" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" abcicli "github.com/celestiaorg/celestia-core/abci/client" abci "github.com/celestiaorg/celestia-core/abci/types" "github.com/celestiaorg/celestia-core/evidence" - "github.com/celestiaorg/celestia-core/ipfs" "github.com/celestiaorg/celestia-core/libs/db/memdb" "github.com/celestiaorg/celestia-core/libs/log" "github.com/celestiaorg/celestia-core/libs/service" @@ -59,9 +55,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { app.InitChain(abci.RequestInitChain{Validators: vals}) blockDB := memdb.NewDB() - bs := ipfs.MockBlockStore() - dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) - blockStore := store.NewBlockStore(blockDB, bs, log.TestingLogger()) + blockStore := store.NewBlockStore(blockDB) // one for mempool, one for consensus mtx := new(tmsync.Mutex) @@ -84,7 +78,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { // Make State blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool) cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, - mempool, dag, ipfs.MockRouting(), evpool) + mempool, evpool) cs.SetLogger(cs.Logger) // set private validator pv := privVals[i] diff --git a/consensus/common_test.go b/consensus/common_test.go index b00468ccf5..01aa4e09e8 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -14,11 +14,6 @@ import ( "time" "github.com/go-kit/kit/log/term" - "github.com/ipfs/go-blockservice" - offline "github.com/ipfs/go-ipfs-exchange-offline" - format "github.com/ipfs/go-ipld-format" - "github.com/ipfs/go-merkledag" - mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/require" abcicli "github.com/celestiaorg/celestia-core/abci/client" @@ -27,7 +22,6 @@ import ( abci "github.com/celestiaorg/celestia-core/abci/types" cfg "github.com/celestiaorg/celestia-core/config" cstypes "github.com/celestiaorg/celestia-core/consensus/types" - "github.com/celestiaorg/celestia-core/ipfs" tmbytes "github.com/celestiaorg/celestia-core/libs/bytes" dbm "github.com/celestiaorg/celestia-core/libs/db" "github.com/celestiaorg/celestia-core/libs/db/memdb" @@ -53,11 +47,11 @@ const ( // test. type cleanupFunc func() -// genesis, chain_id, priv_val, ipfsAPI +// genesis, chain_id, priv_val var ( config *cfg.Config // NOTE: must be reset for each _test.go file consensusReplayConfig *cfg.Config - ensureTimeout = 4 * time.Second + ensureTimeout = 2 * time.Second ) func ensureDir(dir string, mode os.FileMode) { @@ -357,7 +351,7 @@ func subscribeToVoter(cs *State, addr []byte) <-chan tmpubsub.Message { //------------------------------------------------------------------------------- // consensus states -func newState(state sm.State, pv types.PrivValidator, app abci.Application, ipfsDagAPI format.DAGService) *State { +func newState(state sm.State, pv types.PrivValidator, app abci.Application) *State { config := cfg.ResetTestRoot("consensus_state_test") return newStateWithConfig(config, state, pv, app) } @@ -380,9 +374,7 @@ func newStateWithConfigAndBlockStore( blockDB dbm.DB, ) *State { // Get BlockStore - bs := ipfs.MockBlockStore() - dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) - blockStore := store.NewBlockStore(blockDB, bs, log.TestingLogger()) + blockStore := store.NewBlockStore(blockDB) // one for mempool, one for consensus mtx := new(tmsync.Mutex) @@ -406,7 +398,7 @@ func newStateWithConfigAndBlockStore( } blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool) - cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, dag, ipfs.MockRouting(), evpool) + cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) cs.SetLogger(log.TestingLogger().With("module", "consensus")) cs.SetPrivValidator(pv) @@ -438,7 +430,7 @@ func randState(nValidators int) (*State, []*validatorStub) { vss := make([]*validatorStub, nValidators) - cs := newState(state, privVals[0], counter.NewApplication(true), mdutils.Mock()) + cs := newState(state, privVals[0], counter.NewApplication(true)) for i := 0; i < nValidators; i++ { vss[i] = newValidatorStub(privVals[i], int32(i)) diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index a0c668bdb5..8e428c8c98 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -11,7 +11,6 @@ import ( "testing" "time" - mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -23,7 +22,6 @@ import ( cstypes "github.com/celestiaorg/celestia-core/consensus/types" cryptoenc "github.com/celestiaorg/celestia-core/crypto/encoding" "github.com/celestiaorg/celestia-core/crypto/tmhash" - "github.com/celestiaorg/celestia-core/ipfs" "github.com/celestiaorg/celestia-core/libs/bits" "github.com/celestiaorg/celestia-core/libs/bytes" "github.com/celestiaorg/celestia-core/libs/db/memdb" @@ -155,8 +153,9 @@ func TestReactorWithEvidence(t *testing.T) { // duplicate code from: // css[i] = newStateWithConfig(thisConfig, state, privVals[i], app) - dag := mdutils.Mock() - blockStore := store.MockBlockStore(nil) + blockDB := memdb.NewDB() + blockStore := store.NewBlockStore(blockDB) + // one for mempool, one for consensus mtx := new(tmsync.Mutex) proxyAppConnMem := abcicli.NewLocalClient(mtx, app) @@ -184,7 +183,7 @@ func TestReactorWithEvidence(t *testing.T) { // Make State blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool) cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, - mempool, dag, ipfs.MockRouting(), evpool2) + mempool, evpool2) cs.SetLogger(log.TestingLogger().With("module", "consensus")) cs.SetPrivValidator(pv) @@ -671,7 +670,7 @@ func timeoutWaitGroup(t *testing.T, n int, f func(int), css []*State) { // we're running many nodes in-process, possibly in in a virtual machine, // and spewing debug messages - making a block could take a while, - timeout := time.Minute * 8 + timeout := time.Minute * 4 select { case <-done: diff --git a/consensus/replay.go b/consensus/replay.go index e01845abf6..1bc77f9e51 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -463,10 +463,7 @@ func (h *Handshaker) replayBlocks( } for i := firstBlock; i <= finalBlock; i++ { h.logger.Info("Applying block", "height", i) - block, err := h.store.LoadBlock(context.TODO(), i) - if err != nil { - return nil, err - } + block := h.store.LoadBlock(i) // Extra check to ensure the app was not changed in a way it shouldn't have. if len(appHash) > 0 { assertAppHashEqualsOneFromBlock(appHash, block) @@ -495,10 +492,7 @@ func (h *Handshaker) replayBlocks( // ApplyBlock on the proxyApp with the last block. func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.AppConnConsensus) (sm.State, error) { - block, err := h.store.LoadBlock(context.TODO(), height) - if err != nil { - return sm.State{}, err - } + block := h.store.LoadBlock(height) meta := h.store.LoadBlockMeta(height) // Use stubs for both mempool and evidence pool since no transactions nor @@ -506,6 +500,7 @@ func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.Ap blockExec := sm.NewBlockExecutor(h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}) blockExec.SetEventBus(h.eventBus) + var err error state, _, err = blockExec.ApplyBlock(state, meta.BlockID, block) if err != nil { return sm.State{}, err diff --git a/consensus/replay_file.go b/consensus/replay_file.go index 5d5682f41a..5361480990 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -11,7 +11,6 @@ import ( "strings" cfg "github.com/celestiaorg/celestia-core/config" - "github.com/celestiaorg/celestia-core/ipfs" "github.com/celestiaorg/celestia-core/libs/db/badgerdb" "github.com/celestiaorg/celestia-core/libs/log" tmos "github.com/celestiaorg/celestia-core/libs/os" @@ -19,7 +18,6 @@ import ( sm "github.com/celestiaorg/celestia-core/state" "github.com/celestiaorg/celestia-core/store" "github.com/celestiaorg/celestia-core/types" - mdutils "github.com/ipfs/go-merkledag/test" ) const ( @@ -131,7 +129,7 @@ func (pb *playback) replayReset(count int, newStepSub types.Subscription) error pb.cs.Wait() newCS := NewState(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec, - pb.cs.blockStore, pb.cs.txNotifier, mdutils.Mock(), ipfs.MockRouting(), pb.cs.evpool) + pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool) newCS.SetEventBus(pb.cs.eventBus) newCS.startForReplay() @@ -290,8 +288,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo if err != nil { tmos.Exit(err.Error()) } - dag := mdutils.Mock() - blockStore := store.MockBlockStore(blockStoreDB) + blockStore := store.NewBlockStore(blockStoreDB) // Get State stateDB, err := badgerdb.NewDB("state", config.DBDir()) @@ -332,7 +329,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) consensusState := NewState(csConfig, state.Copy(), blockExec, - blockStore, mempool, dag, ipfs.MockRouting(), evpool) + blockStore, mempool, evpool) consensusState.SetEventBus(eventBus) return consensusState } diff --git a/consensus/replay_test.go b/consensus/replay_test.go index a6888d3c45..3fa63b5d74 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -14,8 +14,6 @@ import ( "time" "github.com/gogo/protobuf/proto" - format "github.com/ipfs/go-ipld-format" - mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -131,7 +129,8 @@ func TestWALCrash(t *testing.T) { heightToStop int64 }{ {"empty block", - func(stateDB dbm.DB, cs *State, ctx context.Context) {}, + func(stateDB dbm.DB, cs *State, ctx context.Context) { + }, 1}, {"many non-empty blocks", func(stateDB dbm.DB, cs *State, ctx context.Context) { @@ -546,9 +545,7 @@ func TestSimulateValidatorsChange(t *testing.T) { sim.Chain = make([]*types.Block, 0) sim.Commits = make([]*types.Commit, 0) for i := 1; i <= numBlocks; i++ { - blck, err := css[0].blockStore.LoadBlock(context.TODO(), int64(i)) - require.NoError(t, err) - sim.Chain = append(sim.Chain, blck) + sim.Chain = append(sim.Chain, css[0].blockStore.LoadBlock(int64(i))) sim.Commits = append(sim.Commits, css[0].blockStore.LoadBlockCommit(int64(i))) } } @@ -1182,28 +1179,25 @@ func stateAndStore( // mock block store type mockBlockStore struct { - config *cfg.Config - params tmproto.ConsensusParams - chain []*types.Block - commits []*types.Commit - base int64 - ipfsDagAPI format.DAGService + config *cfg.Config + params tmproto.ConsensusParams + chain []*types.Block + commits []*types.Commit + base int64 } // TODO: NewBlockStore(db.NewMemDB) ... func newMockBlockStore(config *cfg.Config, params tmproto.ConsensusParams) *mockBlockStore { - return &mockBlockStore{config, params, nil, nil, 0, mdutils.Mock()} + return &mockBlockStore{config, params, nil, nil, 0} } -func (bs *mockBlockStore) Height() int64 { return int64(len(bs.chain)) } -func (bs *mockBlockStore) Base() int64 { return bs.base } -func (bs *mockBlockStore) Size() int64 { return bs.Height() - bs.Base() + 1 } -func (bs *mockBlockStore) LoadBaseMeta() *types.BlockMeta { return bs.LoadBlockMeta(bs.base) } -func (bs *mockBlockStore) LoadBlock(ctx context.Context, height int64) (*types.Block, error) { - return bs.chain[height-1], nil -} -func (bs *mockBlockStore) LoadBlockByHash(ctx context.Context, hash []byte) (*types.Block, error) { - return bs.chain[int64(len(bs.chain))-1], nil +func (bs *mockBlockStore) Height() int64 { return int64(len(bs.chain)) } +func (bs *mockBlockStore) Base() int64 { return bs.base } +func (bs *mockBlockStore) Size() int64 { return bs.Height() - bs.Base() + 1 } +func (bs *mockBlockStore) LoadBaseMeta() *types.BlockMeta { return bs.LoadBlockMeta(bs.base) } +func (bs *mockBlockStore) LoadBlock(height int64) *types.Block { return bs.chain[height-1] } +func (bs *mockBlockStore) LoadBlockByHash(hash []byte) *types.Block { + return bs.chain[int64(len(bs.chain))-1] } func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta { block := bs.chain[height-1] @@ -1213,13 +1207,7 @@ func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta { } } func (bs *mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil } -func (bs *mockBlockStore) SaveBlock( - ctx context.Context, - block *types.Block, - blockParts *types.PartSet, - seenCommit *types.Commit, -) error { - return nil +func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { } func (bs *mockBlockStore) LoadBlockCommit(height int64) *types.Commit { return bs.commits[height-1] diff --git a/consensus/state.go b/consensus/state.go index 659ae9d7a0..d52933324c 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -2,7 +2,6 @@ package consensus import ( "bytes" - "context" "errors" "fmt" "io/ioutil" @@ -12,8 +11,6 @@ import ( "time" "github.com/gogo/protobuf/proto" - format "github.com/ipfs/go-ipld-format" - "github.com/libp2p/go-libp2p-core/routing" cfg "github.com/celestiaorg/celestia-core/config" cstypes "github.com/celestiaorg/celestia-core/consensus/types" @@ -95,9 +92,6 @@ type State struct { // store blocks and commits blockStore sm.BlockStore - dag format.DAGService - croute routing.ContentRouting - // create and execute blocks blockExec *sm.BlockExecutor @@ -165,8 +159,6 @@ func NewState( blockExec *sm.BlockExecutor, blockStore sm.BlockStore, txNotifier txNotifier, - dag format.DAGService, - croute routing.ContentRouting, evpool evidencePool, options ...StateOption, ) *State { @@ -174,8 +166,6 @@ func NewState( config: config, blockExec: blockExec, blockStore: blockStore, - dag: dag, - croute: croute, txNotifier: txNotifier, peerMsgQueue: make(chan msgInfo, msgQueueSize), internalMsgQueue: make(chan msgInfo, msgQueueSize), @@ -1545,10 +1535,7 @@ func (cs *State) finalizeCommit(height int64) { // but may differ from the LastCommit included in the next block precommits := cs.Votes.Precommits(cs.CommitRound) seenCommit := precommits.MakeCommit() - err := cs.blockStore.SaveBlock(context.TODO(), block, blockParts, seenCommit) - if err != nil { - panic(err) - } + cs.blockStore.SaveBlock(block, blockParts, seenCommit) } else { // Happens during replay if we already saved the block but didn't commit cs.Logger.Info("Calling finalizeCommit on already stored block", "height", block.Height) diff --git a/consensus/state_test.go b/consensus/state_test.go index 568c1990d5..683931d995 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -640,7 +639,7 @@ func TestStateLockPOLRelock(t *testing.T) { signAddVotes(cs1, tmproto.PrecommitType, nil, types.PartSetHeader{}, vs2, vs3, vs4) // before we timeout to the new round set the new proposal - cs2 := newState(cs1.state, vs2, counter.NewApplication(true), mdutils.Mock()) + cs2 := newState(cs1.state, vs2, counter.NewApplication(true)) prop, propBlock := decideProposal(cs2, vs2, vs2.Height, vs2.Round+1) if prop == nil || propBlock == nil { t.Fatal("Failed to create proposal block with vs2") @@ -826,7 +825,7 @@ func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) { signAddVotes(cs1, tmproto.PrecommitType, nil, types.PartSetHeader{}, vs2, vs3, vs4) // before we timeout to the new round set the new proposal - cs2 := newState(cs1.state, vs2, counter.NewApplication(true), mdutils.Mock()) + cs2 := newState(cs1.state, vs2, counter.NewApplication(true)) prop, propBlock := decideProposal(cs2, vs2, vs2.Height, vs2.Round+1) if prop == nil || propBlock == nil { t.Fatal("Failed to create proposal block with vs2") @@ -870,7 +869,7 @@ func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) { signAddVotes(cs1, tmproto.PrecommitType, nil, types.PartSetHeader{}, vs2, vs3, vs4) // before we timeout to the new round set the new proposal - cs3 := newState(cs1.state, vs3, counter.NewApplication(true), mdutils.Mock()) + cs3 := newState(cs1.state, vs3, counter.NewApplication(true)) prop, propBlock = decideProposal(cs3, vs3, vs3.Height, vs3.Round+1) if prop == nil || propBlock == nil { t.Fatal("Failed to create proposal block with vs2") diff --git a/consensus/wal_test.go b/consensus/wal_test.go index 75778d3902..3e1f65652e 100644 --- a/consensus/wal_test.go +++ b/consensus/wal_test.go @@ -11,7 +11,6 @@ import ( "testing" "time" - mdutils "github.com/ipfs/go-merkledag/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -19,7 +18,6 @@ import ( cfg "github.com/celestiaorg/celestia-core/config" "github.com/celestiaorg/celestia-core/consensus/types" "github.com/celestiaorg/celestia-core/crypto/merkle" - "github.com/celestiaorg/celestia-core/ipfs" "github.com/celestiaorg/celestia-core/libs/autofile" "github.com/celestiaorg/celestia-core/libs/db/memdb" "github.com/celestiaorg/celestia-core/libs/log" @@ -66,7 +64,7 @@ func TestWALTruncate(t *testing.T) { err = walGenerateNBlocks(t, wal.Group(), 60) require.NoError(t, err) - time.Sleep(5 * time.Millisecond) // wait groupCheckDuration, make sure RotateFile run + time.Sleep(1 * time.Millisecond) // wait groupCheckDuration, make sure RotateFile run if err := wal.FlushAndSync(); err != nil { t.Error(err) @@ -310,8 +308,8 @@ func walGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { if err = stateStore.Save(state); err != nil { t.Error(err) } - dag := mdutils.Mock() - blockStore := store.MockBlockStore(blockStoreDB) + + blockStore := store.NewBlockStore(blockStoreDB) proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app)) proxyApp.SetLogger(logger.With("module", "proxy")) @@ -339,7 +337,7 @@ func walGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) require.NoError(t, err) consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore, - mempool, dag, ipfs.MockRouting(), evpool) + mempool, evpool) consensusState.SetLogger(logger) consensusState.SetEventBus(eventBus) if privValidator != nil && privValidator != (*privval.FilePV)(nil) { diff --git a/evidence/pool_test.go b/evidence/pool_test.go index 55623c6538..ebf0dd3344 100644 --- a/evidence/pool_test.go +++ b/evidence/pool_test.go @@ -1,7 +1,6 @@ package evidence_test import ( - "context" "os" "testing" "time" @@ -396,7 +395,7 @@ func initializeValidatorState(privVal types.PrivValidator, height int64) sm.Stor // initializeBlockStore creates a block storage and populates it w/ a dummy // block at +height+. func initializeBlockStore(db dbm.DB, state sm.State, valAddr []byte) *store.BlockStore { - blockStore := store.MockBlockStore(db) + blockStore := store.NewBlockStore(db) for i := int64(1); i <= state.LastBlockHeight; i++ { lastCommit := makeCommit(i-1, valAddr) @@ -408,10 +407,7 @@ func initializeBlockStore(db dbm.DB, state sm.State, valAddr []byte) *store.Bloc partSet := block.MakePartSet(parts) seenCommit := makeCommit(i, valAddr) - err := blockStore.SaveBlock(context.TODO(), block, partSet, seenCommit) - if err != nil { - panic(err) - } + blockStore.SaveBlock(block, partSet, seenCommit) } return blockStore diff --git a/evidence/reactor_test.go b/evidence/reactor_test.go index ee6862f917..dc84b62b5a 100644 --- a/evidence/reactor_test.go +++ b/evidence/reactor_test.go @@ -130,7 +130,7 @@ func TestReactorsGossipNoCommittedEvidence(t *testing.T) { pools[0].Update(state, evList) require.EqualValues(t, uint32(0), pools[0].Size()) - time.Sleep(200 * time.Millisecond) + time.Sleep(100 * time.Millisecond) peer := reactors[0].Switch.Peers().List()[0] ps := peerState{height - 2} @@ -141,7 +141,7 @@ func TestReactorsGossipNoCommittedEvidence(t *testing.T) { peer.Set(types.PeerStateKey, ps) // wait to see that no evidence comes through - time.Sleep(600 * time.Millisecond) + time.Sleep(300 * time.Millisecond) // the second pool should not have received any evidence because it has already been committed assert.Equal(t, uint32(0), pools[1].Size(), "second reactor should not have received evidence") @@ -157,7 +157,7 @@ func TestReactorsGossipNoCommittedEvidence(t *testing.T) { } // wait to see that only one evidence is sent - time.Sleep(600 * time.Millisecond) + time.Sleep(300 * time.Millisecond) // the second pool should only have received the first evidence because it is behind peerEv, _ := pools[1].PendingEvidence(10000) @@ -178,9 +178,9 @@ func TestReactorsGossipNoCommittedEvidence(t *testing.T) { peer.Set(types.PeerStateKey, ps) // wait to see that only two evidence is sent - time.Sleep(1800 * time.Millisecond) + time.Sleep(300 * time.Millisecond) - peerEv, _ = pools[1].PendingEvidence(2000) + peerEv, _ = pools[1].PendingEvidence(1000) assert.EqualValues(t, []types.Evidence{evList[0], evList[1]}, peerEv) } diff --git a/go.mod b/go.mod index 6cf863b7c6..462f12daca 100644 --- a/go.mod +++ b/go.mod @@ -28,10 +28,9 @@ require ( github.com/ipfs/go-ipfs-api v0.2.0 github.com/ipfs/go-ipfs-blockstore v0.1.4 github.com/ipfs/go-ipfs-config v0.11.0 - github.com/ipfs/go-ipfs-exchange-offline v0.0.1 github.com/ipfs/go-ipfs-routing v0.1.0 github.com/ipfs/go-ipld-format v0.2.0 - github.com/ipfs/go-merkledag v0.3.2 + github.com/ipfs/go-merkledag v0.3.2 // indirect github.com/ipfs/go-path v0.0.9 // indirect github.com/ipfs/go-verifcid v0.0.1 github.com/ipfs/interface-go-ipfs-core v0.4.0 diff --git a/ipfs/mock.go b/ipfs/mock.go index 40a8ff36e5..45e66e542e 100644 --- a/ipfs/mock.go +++ b/ipfs/mock.go @@ -3,9 +3,6 @@ package ipfs import ( "context" - ds "github.com/ipfs/go-datastore" - ds_sync "github.com/ipfs/go-datastore/sync" - blockstore "github.com/ipfs/go-ipfs-blockstore" nilrouting "github.com/ipfs/go-ipfs-routing/none" "github.com/ipfs/go-ipfs/core" coremock "github.com/ipfs/go-ipfs/core/mock" @@ -46,7 +43,3 @@ func MockRouting() routing.Routing { croute, _ := nilrouting.ConstructNilRouting(context.TODO(), nil, nil, nil) return croute } - -func MockBlockStore() blockstore.Blockstore { - return blockstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) -} diff --git a/node/node.go b/node/node.go index f548ae3e2d..3082c939f2 100644 --- a/node/node.go +++ b/node/node.go @@ -5,15 +5,12 @@ import ( "context" "errors" "fmt" - "io" "net" "net/http" _ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port "strings" "time" - ipld "github.com/ipfs/go-ipld-format" - "github.com/libp2p/go-libp2p-core/routing" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/cors" @@ -24,7 +21,6 @@ import ( cs "github.com/celestiaorg/celestia-core/consensus" "github.com/celestiaorg/celestia-core/crypto" "github.com/celestiaorg/celestia-core/evidence" - "github.com/celestiaorg/celestia-core/ipfs" dbm "github.com/celestiaorg/celestia-core/libs/db" "github.com/celestiaorg/celestia-core/libs/db/badgerdb" tmjson "github.com/celestiaorg/celestia-core/libs/json" @@ -87,12 +83,12 @@ func DefaultGenesisDocProviderFunc(config *cfg.Config) GenesisDocProvider { } // Provider takes a config and a logger and returns a ready to go Node. -type Provider func(*cfg.Config, ipfs.NodeProvider, log.Logger) (*Node, error) +type Provider func(*cfg.Config, log.Logger) (*Node, error) // DefaultNewNode returns a Tendermint node with default settings for the // PrivValidator, ClientCreator, GenesisDoc, and DBProvider. // It implements NodeProvider. -func DefaultNewNode(config *cfg.Config, ipfs ipfs.NodeProvider, logger log.Logger) (*Node, error) { +func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) { nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile()) if err != nil { return nil, fmt.Errorf("failed to load or gen node key %s: %w", config.NodeKeyFile(), err) @@ -109,7 +105,6 @@ func DefaultNewNode(config *cfg.Config, ipfs ipfs.NodeProvider, logger log.Logge proxy.DefaultClientCreator(config.ProxyApp, config.DBDir()), DefaultGenesisDocProviderFunc(config), DefaultDBProvider, - ipfs, DefaultMetricsProvider(config.Instrumentation), logger, ) @@ -215,8 +210,6 @@ type Node struct { txIndexer txindex.TxIndexer indexerService *txindex.IndexerService prometheusSrv *http.Server - - ipfsClose io.Closer } func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger) (proxy.AppConns, error) { @@ -369,8 +362,7 @@ func createBlockchainReactor(config *cfg.Config, return bcReactor, nil } -func createConsensusReactor( - config *cfg.Config, +func createConsensusReactor(config *cfg.Config, state sm.State, blockExec *sm.BlockExecutor, blockStore sm.BlockStore, @@ -380,8 +372,6 @@ func createConsensusReactor( csMetrics *cs.Metrics, waitSync bool, eventBus *types.EventBus, - dag ipld.DAGService, - croute routing.ContentRouting, consensusLogger log.Logger) (*cs.Reactor, *cs.State) { consensusState := cs.NewState( @@ -390,8 +380,6 @@ func createConsensusReactor( blockExec, blockStore, mempool, - dag, - croute, evidencePool, cs.StateMetrics(csMetrics), ) @@ -620,7 +608,6 @@ func NewNode(config *cfg.Config, clientCreator proxy.ClientCreator, genesisDocProvider GenesisDocProvider, dbProvider DBProvider, - ipfsProvider ipfs.NodeProvider, metricsProvider MetricsProvider, logger log.Logger, options ...Option) (*Node, error) { @@ -685,12 +672,7 @@ func NewNode(config *cfg.Config, return nil, err } - ipfsNode, err := ipfsProvider() - if err != nil { - return nil, err - } - - blockStore := store.NewBlockStore(blockStoreDB, ipfsNode.Blockstore, logger) + blockStore := store.NewBlockStore(blockStoreDB) // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state, // and replays any blocks as necessary to sync tendermint with the app. @@ -751,7 +733,7 @@ func NewNode(config *cfg.Config, } consensusReactor, consensusState := createConsensusReactor( config, state, blockExec, blockStore, mempool, evidencePool, - privValidator, csMetrics, stateSync || fastSync, eventBus, ipfsNode.DAG, ipfsNode.Routing, consensusLogger, + privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger, ) // Set up state sync reactor, and schedule a sync if requested. @@ -852,7 +834,6 @@ func NewNode(config *cfg.Config, txIndexer: txIndexer, indexerService: indexerService, eventBus: eventBus, - ipfsClose: ipfsNode, } node.BaseService = *service.NewBaseService(logger, "Node", node) @@ -996,10 +977,6 @@ func (n *Node) OnStop() { n.Logger.Error("Prometheus HTTP server Shutdown", "err", err) } } - - if err := n.ipfsClose.Close(); err != nil { - n.Logger.Error("ipfsClose.Close()", err) - } } // ConfigureRPC makes sure RPC has all the objects it needs to operate. @@ -1402,8 +1379,8 @@ func createAndStartPrivValidatorSocketClient( } const ( - retries = 50 // 50 * 200ms = 10s total - timeout = 200 * time.Millisecond + retries = 50 // 50 * 100ms = 5s total + timeout = 100 * time.Millisecond ) pvscWithRetries := privval.NewRetrySignerClient(pvsc, retries, timeout) diff --git a/node/node_test.go b/node/node_test.go index 59de04dac2..df3ecabf97 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -19,7 +19,6 @@ import ( "github.com/celestiaorg/celestia-core/crypto/ed25519" "github.com/celestiaorg/celestia-core/crypto/tmhash" "github.com/celestiaorg/celestia-core/evidence" - "github.com/celestiaorg/celestia-core/ipfs" dbm "github.com/celestiaorg/celestia-core/libs/db" "github.com/celestiaorg/celestia-core/libs/db/memdb" "github.com/celestiaorg/celestia-core/libs/log" @@ -52,7 +51,6 @@ func defaultNewTestNode(config *cfg.Config, logger log.Logger) (*Node, error) { proxy.DefaultClientCreator(config.ProxyApp, config.DBDir()), DefaultGenesisDocProviderFunc(config), InMemDBProvider, - ipfs.Mock(), DefaultMetricsProvider(config.Instrumentation), logger, ) @@ -218,7 +216,7 @@ func TestNodeSetPrivValIPC(t *testing.T) { log.TestingLogger(), dialer, ) - privval.SignerDialerEndpointTimeoutReadWrite(400 * time.Millisecond)(dialerEndpoint) + privval.SignerDialerEndpointTimeoutReadWrite(100 * time.Millisecond)(dialerEndpoint) pvsc := privval.NewSignerServer( dialerEndpoint, @@ -283,7 +281,7 @@ func TestCreateProposalBlock(t *testing.T) { // Make EvidencePool evidenceDB := memdb.NewDB() - blockStore := store.MockBlockStore(nil) + blockStore := store.NewBlockStore(memdb.NewDB()) evidencePool, err := evidence.NewPool(evidenceDB, stateStore, blockStore) require.NoError(t, err) evidencePool.SetLogger(logger) @@ -539,7 +537,6 @@ func TestNodeNewNodeCustomReactors(t *testing.T) { proxy.DefaultClientCreator(config.ProxyApp, config.DBDir()), DefaultGenesisDocProviderFunc(config), InMemDBProvider, - ipfs.Mock(), DefaultMetricsProvider(config.Instrumentation), log.TestingLogger(), CustomReactors(map[string]p2p.Reactor{"FOO": cr, "BLOCKCHAIN": customBlockchainReactor}), diff --git a/rpc/core/blocks.go b/rpc/core/blocks.go index 24236e73f6..20a97fdb7a 100644 --- a/rpc/core/blocks.go +++ b/rpc/core/blocks.go @@ -89,10 +89,7 @@ func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) return nil, err } - block, err := env.BlockStore.LoadBlock(ctx.Context(), height) - if err != nil { - return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: block}, err - } + block := env.BlockStore.LoadBlock(height) blockMeta := env.BlockStore.LoadBlockMeta(height) if blockMeta == nil { return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: block}, nil @@ -103,10 +100,7 @@ func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) // BlockByHash gets block by hash. // More: https://docs.tendermint.com/master/rpc/#/Info/block_by_hash func BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error) { - block, err := env.BlockStore.LoadBlockByHash(ctx.Context(), hash) - if err != nil { - return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: nil}, err - } + block := env.BlockStore.LoadBlockByHash(hash) if block == nil { return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: nil}, nil } @@ -151,14 +145,9 @@ func DataAvailabilityHeader(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.Re // TODO: store DAHeader to avoid loading the full block each time // depends on either: - // - https://github.com/celestiaorg/celestia-core/pull/312, or - // - https://github.com/celestiaorg/celestia-core/pull/218 - block, err := env.BlockStore.LoadBlock(ctx.Context(), height) - if err != nil { - return &ctypes.ResultDataAvailabilityHeader{ - DataAvailabilityHeader: types.DataAvailabilityHeader{}, - }, err - } + // - https://github.com/lazyledger/lazyledger-core/pull/312, or + // - https://github.com/lazyledger/lazyledger-core/pull/218 + block := env.BlockStore.LoadBlock(height) _ = block.Hash() dah := block.DataAvailabilityHeader return &ctypes.ResultDataAvailabilityHeader{ diff --git a/rpc/core/blocks_test.go b/rpc/core/blocks_test.go index d4e68f0666..9998969a75 100644 --- a/rpc/core/blocks_test.go +++ b/rpc/core/blocks_test.go @@ -1,7 +1,6 @@ package core import ( - "context" "fmt" "testing" @@ -119,26 +118,16 @@ type mockBlockStore struct { height int64 } -func (mockBlockStore) Base() int64 { return 1 } -func (store mockBlockStore) Height() int64 { return store.height } -func (store mockBlockStore) Size() int64 { return store.height } -func (mockBlockStore) LoadBaseMeta() *types.BlockMeta { return nil } -func (mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta { return nil } -func (mockBlockStore) LoadBlock(ctx context.Context, height int64) (*types.Block, error) { - return nil, nil -} -func (mockBlockStore) LoadBlockByHash(ctx context.Context, hash []byte) (*types.Block, error) { - return nil, nil -} +func (mockBlockStore) Base() int64 { return 1 } +func (store mockBlockStore) Height() int64 { return store.height } +func (store mockBlockStore) Size() int64 { return store.height } +func (mockBlockStore) LoadBaseMeta() *types.BlockMeta { return nil } +func (mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta { return nil } +func (mockBlockStore) LoadBlock(height int64) *types.Block { return nil } +func (mockBlockStore) LoadBlockByHash(hash []byte) *types.Block { return nil } func (mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil } func (mockBlockStore) LoadBlockCommit(height int64) *types.Commit { return nil } func (mockBlockStore) LoadSeenCommit(height int64) *types.Commit { return nil } func (mockBlockStore) PruneBlocks(height int64) (uint64, error) { return 0, nil } -func (mockBlockStore) SaveBlock( - ctx context.Context, - block *types.Block, - blockParts *types.PartSet, - seenCommit *types.Commit, -) error { - return nil +func (mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { } diff --git a/rpc/core/tx.go b/rpc/core/tx.go index 817cd56eb1..0d2b6c21bb 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -37,10 +37,7 @@ func Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error var proof types.TxProof if prove { - block, err := env.BlockStore.LoadBlock(ctx.Context(), height) - if err != nil { - return nil, err - } + block := env.BlockStore.LoadBlock(height) proof = block.Data.Txs.Proof(int(index)) // XXX: overflow on 32-bit machines } @@ -110,10 +107,7 @@ func TxSearch(ctx *rpctypes.Context, query string, prove bool, pagePtr, perPageP var proof types.TxProof if prove { - block, err := env.BlockStore.LoadBlock(ctx.Context(), r.Height) - if err != nil { - return nil, err - } + block := env.BlockStore.LoadBlock(r.Height) proof = block.Data.Txs.Proof(int(r.Index)) // XXX: overflow on 32-bit machines } diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index 981cbfd855..b2e8233abe 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -10,7 +10,6 @@ import ( abci "github.com/celestiaorg/celestia-core/abci/types" cfg "github.com/celestiaorg/celestia-core/config" - "github.com/celestiaorg/celestia-core/ipfs" "github.com/celestiaorg/celestia-core/libs/log" tmnet "github.com/celestiaorg/celestia-core/libs/net" nm "github.com/celestiaorg/celestia-core/node" @@ -176,7 +175,6 @@ func NewTendermint(app abci.Application, opts *Options) *nm.Node { node, err := nm.NewNode(config, pv, nodeKey, papp, nm.DefaultGenesisDocProviderFunc(config), nm.InMemDBProvider, - ipfs.Mock(), nm.DefaultMetricsProvider(config.Instrumentation), logger, ) diff --git a/state/services.go b/state/services.go index 0ca6a4e935..6dd03fad17 100644 --- a/state/services.go +++ b/state/services.go @@ -1,8 +1,6 @@ package state import ( - "context" - "github.com/celestiaorg/celestia-core/types" ) @@ -22,13 +20,13 @@ type BlockStore interface { LoadBaseMeta() *types.BlockMeta LoadBlockMeta(height int64) *types.BlockMeta - LoadBlock(ctx context.Context, height int64) (*types.Block, error) + LoadBlock(height int64) *types.Block - SaveBlock(ctx context.Context, block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) error + SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) PruneBlocks(height int64) (uint64, error) - LoadBlockByHash(ctx context.Context, hash []byte) (*types.Block, error) + LoadBlockByHash(hash []byte) *types.Block LoadBlockPart(height int64, index int) *types.Part LoadBlockCommit(height int64) *types.Commit diff --git a/store/mock.go b/store/mock.go deleted file mode 100644 index 2c2e7e90cc..0000000000 --- a/store/mock.go +++ /dev/null @@ -1,21 +0,0 @@ -package store - -import ( - "github.com/celestiaorg/celestia-core/ipfs" - dbm "github.com/celestiaorg/celestia-core/libs/db" - "github.com/celestiaorg/celestia-core/libs/db/memdb" - "github.com/celestiaorg/celestia-core/libs/log" -) - -// MockBlockStore returns a mocked blockstore. a nil db will result in a new in memory db -func MockBlockStore(db dbm.DB) *BlockStore { - if db == nil { - db = memdb.NewDB() - } - - return NewBlockStore( - db, - ipfs.MockBlockStore(), - log.NewNopLogger(), - ) -} diff --git a/store/store.go b/store/store.go index 4e9be36cf9..69955470b6 100644 --- a/store/store.go +++ b/store/store.go @@ -1,28 +1,16 @@ package store import ( - "context" "fmt" - "strings" - "strconv" "github.com/gogo/protobuf/proto" - "github.com/ipfs/go-blockservice" - blockstore "github.com/ipfs/go-ipfs-blockstore" - offline "github.com/ipfs/go-ipfs-exchange-offline" - format "github.com/ipfs/go-ipld-format" - "github.com/ipfs/go-merkledag" - "github.com/celestiaorg/celestia-core/ipfs" dbm "github.com/celestiaorg/celestia-core/libs/db" - "github.com/celestiaorg/celestia-core/libs/log" tmsync "github.com/celestiaorg/celestia-core/libs/sync" - "github.com/celestiaorg/celestia-core/p2p/ipld" tmstore "github.com/celestiaorg/celestia-core/proto/tendermint/store" tmproto "github.com/celestiaorg/celestia-core/proto/tendermint/types" "github.com/celestiaorg/celestia-core/types" - "github.com/celestiaorg/rsmt2d" ) /* @@ -53,21 +41,16 @@ type BlockStore struct { mtx tmsync.RWMutex base int64 height int64 - - dag format.DAGService - logger log.Logger } // NewBlockStore returns a new BlockStore with the given DB, // initialized to the last height that was committed to the DB. -func NewBlockStore(db dbm.DB, bstore blockstore.Blockstore, logger log.Logger) *BlockStore { +func NewBlockStore(db dbm.DB) *BlockStore { bs := LoadBlockStoreState(db) return &BlockStore{ base: bs.Base, height: bs.Height, db: db, - dag: merkledag.NewDAGService(blockservice.New(bstore, offline.Exchange(bstore))), - logger: logger, } } @@ -107,43 +90,48 @@ func (bs *BlockStore) LoadBaseMeta() *types.BlockMeta { // LoadBlock returns the block with the given height. // If no block is found for that height, it returns nil. -func (bs *BlockStore) LoadBlock(ctx context.Context, height int64) (*types.Block, error) { - blockMeta := bs.LoadBlockMeta(height) +func (bs *BlockStore) LoadBlock(height int64) *types.Block { + var blockMeta = bs.LoadBlockMeta(height) if blockMeta == nil { - return nil, nil + return nil } - lastCommit := bs.LoadBlockCommit(height - 1) - - data, err := ipld.RetrieveBlockData(ctx, &blockMeta.DAHeader, bs.dag, rsmt2d.NewRSGF8Codec()) - if err != nil { - if strings.Contains(err.Error(), format.ErrNotFound.Error()) { - return nil, fmt.Errorf("failure to retrieve block data from local ipfs store: %w", err) + pbb := new(tmproto.Block) + buf := []byte{} + for i := 0; i < int(blockMeta.BlockID.PartSetHeader.Total); i++ { + part := bs.LoadBlockPart(height, i) + // If the part is missing (e.g. since it has been deleted after we + // loaded the block meta) we consider the whole block to be missing. + if part == nil { + return nil } - bs.logger.Info("failure to retrieve block data", err) - return nil, err + buf = append(buf, part.Bytes...) + } + err := proto.Unmarshal(buf, pbb) + if err != nil { + // NOTE: The existence of meta should imply the existence of the + // block. So, make sure meta is only saved after blocks are saved. + panic(fmt.Sprintf("Error reading block: %v", err)) } - block := types.Block{ - Header: blockMeta.Header, - Data: data, - DataAvailabilityHeader: blockMeta.DAHeader, - LastCommit: lastCommit, + block, err := types.BlockFromProto(pbb) + if err != nil { + panic(fmt.Errorf("error from proto block: %w", err)) } - return &block, nil + return block } // LoadBlockByHash returns the block with the given hash. // If no block is found for that hash, it returns nil. // Panics if it fails to parse height associated with the given hash. -func (bs *BlockStore) LoadBlockByHash(ctx context.Context, hash []byte) (*types.Block, error) { +func (bs *BlockStore) LoadBlockByHash(hash []byte) *types.Block { bz, err := bs.db.Get(calcBlockHashKey(hash)) if err != nil { panic(err) } if len(bz) == 0 { - return nil, nil + return nil } s := string(bz) @@ -152,7 +140,7 @@ func (bs *BlockStore) LoadBlockByHash(ctx context.Context, hash []byte) (*types. if err != nil { panic(fmt.Sprintf("failed to extract height from %s: %v", s, err)) } - return bs.LoadBlock(ctx, height) + return bs.LoadBlock(height) } // LoadBlockPart returns the Part at the given index @@ -340,12 +328,7 @@ func (bs *BlockStore) PruneBlocks(height int64) (uint64, error) { // If all the nodes restart after committing a block, // we need this to reload the precommits to catch-up nodes to the // most recent height. Otherwise they'd stall at H-1. -func (bs *BlockStore) SaveBlock( - ctx context.Context, - block *types.Block, - blockParts *types.PartSet, - seenCommit *types.Commit, -) error { +func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { if block == nil { panic("BlockStore can only save a non-nil block") } @@ -369,11 +352,6 @@ func (bs *BlockStore) SaveBlock( bs.saveBlockPart(height, i, part) } - err := ipld.PutBlock(ctx, bs.dag, block, ipfs.MockRouting(), bs.logger) - if err != nil { - return err - } - // Save block meta blockMeta := types.NewBlockMeta(block, blockParts) pbm, err := blockMeta.ToProto() @@ -416,8 +394,6 @@ func (bs *BlockStore) SaveBlock( // Save new BlockStoreState descriptor. This also flushes the database. bs.saveState() - - return nil } func (bs *BlockStore) saveBlockPart(height int64, index int, part *types.Part) { diff --git a/store/store_test.go b/store/store_test.go index 39c89e7e02..0a4659fe48 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -2,7 +2,6 @@ package store import ( "bytes" - "context" "crypto/sha256" "fmt" "os" @@ -74,7 +73,7 @@ func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore, cleanupFu if err != nil { panic(fmt.Errorf("error constructing state from genesis file: %w", err)) } - return state, MockBlockStore(blockDB), func() { os.RemoveAll(config.RootDir) } + return state, NewBlockStore(blockDB), func() { os.RemoveAll(config.RootDir) } } func TestLoadBlockStoreState(t *testing.T) { @@ -106,7 +105,7 @@ func TestNewBlockStore(t *testing.T) { bz, _ := proto.Marshal(&bss) err := db.Set(blockStoreKey, bz) require.NoError(t, err) - bs := MockBlockStore(db) + bs := NewBlockStore(db) require.Equal(t, int64(100), bs.Base(), "failed to properly parse blockstore") require.Equal(t, int64(10000), bs.Height(), "failed to properly parse blockstore") @@ -124,7 +123,7 @@ func TestNewBlockStore(t *testing.T) { _, _, panicErr := doFn(func() (interface{}, error) { err := db.Set(blockStoreKey, tt.data) require.NoError(t, err) - _ = MockBlockStore(db) + _ = NewBlockStore(db) return nil, nil }) require.NotNil(t, panicErr, "#%d panicCauser: %q expected a panic", i, tt.data) @@ -133,13 +132,13 @@ func TestNewBlockStore(t *testing.T) { err = db.Set(blockStoreKey, []byte{}) require.NoError(t, err) - bs = MockBlockStore(db) + bs = NewBlockStore(db) assert.Equal(t, bs.Height(), int64(0), "expecting empty bytes to be unmarshaled alright") } func freshBlockStore() (*BlockStore, dbm.DB) { db := memdb.NewDB() - return MockBlockStore(db), db + return NewBlockStore(db), db } var ( @@ -172,12 +171,10 @@ func TestBlockStoreSaveLoadBlock(t *testing.T) { require.Equal(t, bs.Base(), int64(0), "initially the base should be zero") require.Equal(t, bs.Height(), int64(0), "initially the height should be zero") - ctx := context.TODO() - // check there are no blocks at various heights noBlockHeights := []int64{0, -1, 100, 1000, 2} for i, height := range noBlockHeights { - if g, _ := bs.LoadBlock(ctx, height); g != nil { + if g := bs.LoadBlock(height); g != nil { t.Errorf("#%d: height(%d) got a block; want nil", i, height) } } @@ -186,14 +183,13 @@ func TestBlockStoreSaveLoadBlock(t *testing.T) { block := makeBlock(bs.Height()+1, state, new(types.Commit)) validPartSet := block.MakePartSet(2) seenCommit := makeTestCommit(10, tmtime.Now()) - err := bs.SaveBlock(ctx, block, partSet, seenCommit) - require.NoError(t, err) + bs.SaveBlock(block, partSet, seenCommit) require.EqualValues(t, 1, bs.Base(), "expecting the new height to be changed") require.EqualValues(t, block.Header.Height, bs.Height(), "expecting the new height to be changed") incompletePartSet := types.NewPartSetFromHeader(types.PartSetHeader{Total: 2}) uncontiguousPartSet := types.NewPartSetFromHeader(types.PartSetHeader{Total: 0}) - _, err = uncontiguousPartSet.AddPart(part2) + _, err := uncontiguousPartSet.AddPart(part2) require.Error(t, err) header1 := types.Header{ @@ -308,20 +304,16 @@ func TestBlockStoreSaveLoadBlock(t *testing.T) { bs, db := freshBlockStore() // SaveBlock res, err, panicErr := doFn(func() (interface{}, error) { - err := bs.SaveBlock(ctx, tuple.block, tuple.parts, tuple.seenCommit) + bs.SaveBlock(tuple.block, tuple.parts, tuple.seenCommit) if tuple.block == nil { return nil, nil } - if err != nil { - return nil, err - } if tuple.corruptBlockInDB { err := db.Set(calcBlockMetaKey(tuple.block.Height), []byte("block-bogus")) require.NoError(t, err) } - bBlock, err := bs.LoadBlock(ctx, tuple.block.Height) - require.NoError(t, err) + bBlock := bs.LoadBlock(tuple.block.Height) bBlockMeta := bs.LoadBlockMeta(tuple.block.Height) if tuple.eraseSeenCommitInDB { @@ -388,14 +380,13 @@ func TestLoadBaseMeta(t *testing.T) { stateStore := sm.NewStore(memdb.NewDB()) state, err := stateStore.LoadFromDBOrGenesisFile(config.GenesisFile()) require.NoError(t, err) - bs := MockBlockStore(nil) + bs := NewBlockStore(memdb.NewDB()) for h := int64(1); h <= 10; h++ { block := makeBlock(h, state, new(types.Commit)) partSet := block.MakePartSet(2) seenCommit := makeTestCommit(h, tmtime.Now()) - err := bs.SaveBlock(context.TODO(), block, partSet, seenCommit) - require.NoError(t, err) + bs.SaveBlock(block, partSet, seenCommit) } _, err = bs.PruneBlocks(4) @@ -446,13 +437,11 @@ func TestPruneBlocks(t *testing.T) { state, err := stateStore.LoadFromDBOrGenesisFile(config.GenesisFile()) require.NoError(t, err) db := memdb.NewDB() - bs := MockBlockStore(db) + bs := NewBlockStore(db) assert.EqualValues(t, 0, bs.Base()) assert.EqualValues(t, 0, bs.Height()) assert.EqualValues(t, 0, bs.Size()) - ctx := context.TODO() - // pruning an empty store should error, even when pruning to 0 _, err = bs.PruneBlocks(1) require.Error(t, err) @@ -465,16 +454,14 @@ func TestPruneBlocks(t *testing.T) { block := makeBlock(h, state, new(types.Commit)) partSet := block.MakePartSet(2) seenCommit := makeTestCommit(h, tmtime.Now()) - err := bs.SaveBlock(ctx, block, partSet, seenCommit) - require.NoError(t, err) + bs.SaveBlock(block, partSet, seenCommit) } assert.EqualValues(t, 1, bs.Base()) assert.EqualValues(t, 1500, bs.Height()) assert.EqualValues(t, 1500, bs.Size()) - prunedBlock, err := bs.LoadBlock(ctx, 1199) - require.NoError(t, err) + prunedBlock := bs.LoadBlock(1199) // Check that basic pruning works pruned, err := bs.PruneBlocks(1200) @@ -488,29 +475,18 @@ func TestPruneBlocks(t *testing.T) { Height: 1500, }, LoadBlockStoreState(db)) - b, err := bs.LoadBlock(ctx, 1200) - require.NotNil(t, b) - require.NoError(t, err) - b, err = bs.LoadBlock(ctx, 1199) - require.NoError(t, err) - require.Nil(t, b) - b, err = bs.LoadBlockByHash(ctx, prunedBlock.Hash()) - require.Nil(t, b) - require.NoError(t, err) - + require.NotNil(t, bs.LoadBlock(1200)) + require.Nil(t, bs.LoadBlock(1199)) + require.Nil(t, bs.LoadBlockByHash(prunedBlock.Hash())) require.Nil(t, bs.LoadBlockCommit(1199)) require.Nil(t, bs.LoadBlockMeta(1199)) require.Nil(t, bs.LoadBlockPart(1199, 1)) for i := int64(1); i < 1200; i++ { - b, err := bs.LoadBlock(ctx, i) - require.Nil(t, b) - require.NoError(t, err) + require.Nil(t, bs.LoadBlock(i)) } for i := int64(1200); i <= 1500; i++ { - b, err := bs.LoadBlock(ctx, i) - require.NotNil(t, b) - require.NoError(t, err) + require.NotNil(t, bs.LoadBlock(i)) } // Pruning below the current base should error @@ -536,15 +512,9 @@ func TestPruneBlocks(t *testing.T) { pruned, err = bs.PruneBlocks(1500) require.NoError(t, err) assert.EqualValues(t, 200, pruned) - b, err = bs.LoadBlock(ctx, 1499) - assert.Nil(t, b) - require.NoError(t, err) - b, err = bs.LoadBlock(ctx, 1500) - assert.NotNil(t, b) - require.NoError(t, err) - b, err = bs.LoadBlock(ctx, 1501) - assert.Nil(t, b) - require.NoError(t, err) + assert.Nil(t, bs.LoadBlock(1499)) + assert.NotNil(t, bs.LoadBlock(1500)) + assert.Nil(t, bs.LoadBlock(1501)) } func TestLoadBlockMeta(t *testing.T) { @@ -590,7 +560,6 @@ func TestLoadBlockMeta(t *testing.T) { } func TestBlockFetchAtHeight(t *testing.T) { - ctx := context.TODO() state, bs, cleanup := makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer))) defer cleanup() require.Equal(t, bs.Height(), int64(0), "initially the height should be zero") @@ -598,12 +567,10 @@ func TestBlockFetchAtHeight(t *testing.T) { partSet := block.MakePartSet(2) seenCommit := makeTestCommit(10, tmtime.Now()) - err := bs.SaveBlock(ctx, block, partSet, seenCommit) - require.NoError(t, err) + bs.SaveBlock(block, partSet, seenCommit) require.Equal(t, bs.Height(), block.Header.Height, "expecting the new height to be changed") - blockAtHeight, err := bs.LoadBlock(ctx, bs.Height()) - require.NoError(t, err) + blockAtHeight := bs.LoadBlock(bs.Height()) b1, err := block.ToProto() require.NoError(t, err) b2, err := blockAtHeight.ToProto() @@ -614,11 +581,9 @@ func TestBlockFetchAtHeight(t *testing.T) { require.Equal(t, block.Hash(), blockAtHeight.Hash(), "expecting a successful load of the last saved block") - blockAtHeightPlus1, err := bs.LoadBlock(ctx, bs.Height()+1) - require.NoError(t, err) + blockAtHeightPlus1 := bs.LoadBlock(bs.Height() + 1) require.Nil(t, blockAtHeightPlus1, "expecting an unsuccessful load of Height()+1") - blockAtHeightPlus2, err := bs.LoadBlock(ctx, bs.Height()+2) - require.NoError(t, err) + blockAtHeightPlus2 := bs.LoadBlock(bs.Height() + 2) require.Nil(t, blockAtHeightPlus2, "expecting an unsuccessful load of Height()+2") } diff --git a/test/e2e/app/main.go b/test/e2e/app/main.go index 3bc274ac88..cd91686215 100644 --- a/test/e2e/app/main.go +++ b/test/e2e/app/main.go @@ -11,7 +11,6 @@ import ( "github.com/celestiaorg/celestia-core/config" "github.com/celestiaorg/celestia-core/crypto/ed25519" - "github.com/celestiaorg/celestia-core/ipfs" tmflags "github.com/celestiaorg/celestia-core/libs/cli/flags" "github.com/celestiaorg/celestia-core/libs/log" tmnet "github.com/celestiaorg/celestia-core/libs/net" @@ -104,7 +103,6 @@ func startNode(cfg *Config) error { proxy.NewLocalClientCreator(app), node.DefaultGenesisDocProviderFunc(tmcfg), node.DefaultDBProvider, - ipfs.Embedded(true, ipfs.DefaultConfig(), nodeLogger), node.DefaultMetricsProvider(tmcfg.Instrumentation), nodeLogger, ) diff --git a/test/e2e/runner/setup.go b/test/e2e/runner/setup.go index db876c38de..11565ecba0 100644 --- a/test/e2e/runner/setup.go +++ b/test/e2e/runner/setup.go @@ -21,7 +21,6 @@ import ( "github.com/celestiaorg/celestia-core/config" "github.com/celestiaorg/celestia-core/crypto/ed25519" - "github.com/celestiaorg/celestia-core/ipfs" "github.com/celestiaorg/celestia-core/p2p" "github.com/celestiaorg/celestia-core/privval" e2e "github.com/celestiaorg/celestia-core/test/e2e/pkg" @@ -86,8 +85,7 @@ func Setup(testnet *e2e.Testnet) error { if err != nil { return err } - // todo(evan): the path should be a constant - cfg.IPFS.RepoPath = filepath.Join(nodeDir, ".ipfs") + config.WriteConfigFile(filepath.Join(nodeDir, "config", "config.toml"), cfg) // panics appCfg, err := MakeAppConfig(node) @@ -115,10 +113,6 @@ func Setup(testnet *e2e.Testnet) error { filepath.Join(nodeDir, PrivvalDummyKeyFile), filepath.Join(nodeDir, PrivvalDummyStateFile), )).Save() - err = ipfs.InitRepo(cfg.IPFS.RepoPath, logger) - if err != nil { - return err - } } return nil