Skip to content

Commit

Permalink
add more comments and cleanup. remove block index by height, just go …
Browse files Browse the repository at this point in the history
…to db for clarity
  • Loading branch information
lazynina committed Dec 10, 2024
1 parent 4caabaa commit 8a0bf0f
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 129 deletions.
65 changes: 14 additions & 51 deletions lib/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,34 +568,29 @@ type CheckpointBlockInfoAndError struct {
// best chain using the provided functions. Additionally, it always tracks
// the block tip and header tip.
type BlockIndex struct {
db *badger.DB
snapshot *Snapshot
blockIndexByHash *collections.LruCache[BlockHash, *BlockNode]
blockIndexByHeight *collections.LruCache[uint64, []*BlockNode]
tip *BlockNode
headerTip *BlockNode
db *badger.DB
snapshot *Snapshot
blockIndexByHash *collections.LruCache[BlockHash, *BlockNode]
tip *BlockNode
headerTip *BlockNode
}

// NewBlockIndex creates a new BlockIndex with the provided snapshot and tip node.
func NewBlockIndex(db *badger.DB, snapshot *Snapshot, tipNode *BlockNode) *BlockIndex {
blockIndexByHash, _ := collections.NewLruCache[BlockHash, *BlockNode](MaxBlockIndexNodes) // TODO: parameterize this?
blockIndexByHeight, _ := collections.NewLruCache[uint64, []*BlockNode](MaxBlockIndexNodes) // TODO: parameterize this?
blockIndexByHash, _ := collections.NewLruCache[BlockHash, *BlockNode](MaxBlockIndexNodes) // TODO: parameterize this?
return &BlockIndex{
db: db,
snapshot: snapshot,
blockIndexByHash: blockIndexByHash,
blockIndexByHeight: blockIndexByHeight,
tip: tipNode,
db: db,
snapshot: snapshot,
blockIndexByHash: blockIndexByHash,
tip: tipNode,
}
}

// setBlockIndexFromMap is a helper function only used in tests. It constructs the
// block index from the provided map of block hashes to block nodes.
func (bi *BlockIndex) setBlockIndexFromMap(input map[BlockHash]*BlockNode) {
newHashToBlockNodeMap, _ := collections.NewLruCache[BlockHash, *BlockNode](MaxBlockIndexNodes) // TODO: parameterize this?
newHeightToBlockNodeMap, _ := collections.NewLruCache[uint64, []*BlockNode](MaxBlockIndexNodes) // TODO: parameterize this?
newHashToBlockNodeMap, _ := collections.NewLruCache[BlockHash, *BlockNode](MaxBlockIndexNodes) // TODO: parameterize this?
bi.blockIndexByHash = newHashToBlockNodeMap
bi.blockIndexByHeight = newHeightToBlockNodeMap
for _, val := range input {
bi.addNewBlockNodeToBlockIndex(val)
// This function is always used for tests.
Expand Down Expand Up @@ -628,22 +623,6 @@ func (bi *BlockIndex) setTip(tip *BlockNode) {
// it to both the block index by hash and the block index by height.
func (bi *BlockIndex) addNewBlockNodeToBlockIndex(blockNode *BlockNode) {
bi.blockIndexByHash.Put(*blockNode.Hash, blockNode)
blocksAtHeight, exists := bi.blockIndexByHeight.Get(uint64(blockNode.Height))
if !exists || blocksAtHeight == nil {
blocksAtHeight = []*BlockNode{}
} else {
// TODO: we *could* make this more efficient by using a map,
// but generally we won't have many blocks at the same height.
// Make sure we don't add the same block node twice.
for _, blockAtHeight := range blocksAtHeight {
if blockAtHeight.Hash.IsEqual(blockNode.Hash) {
// If we've already seen this block node, just
// return early. there's nothing left to do.
return
}
}
}
bi.blockIndexByHeight.Put(uint64(blockNode.Height), append(blocksAtHeight, blockNode))
}

// GetBlockNodeByHashOnly retrieves a block node from the block index by its hash.
Expand Down Expand Up @@ -707,15 +686,6 @@ func (bi *BlockIndex) GetBlockNodesByHeight(height uint64) []*BlockNode {
if height > math.MaxUint32 {
glog.Fatalf("GetBlockNodesByHeight: Height %d is greater than math.MaxUint32", height)
}
// If we have it in the cache, just return it!
// TODO: maybe this isn't necessarily correct. We probably need to check the DB
// to be safe...
blockNodesAtHeight, exists := bi.blockIndexByHeight.Get(height)
if exists {
return blockNodesAtHeight
}
// If we don't have it in the cache, we need to get the block nodes from the database.
// We need to get the prefix key for the height hash to node prefix.
prefixKey := _heightHashToNodePrefixByHeight(uint32(height), false)
// Enumerate all block nodes for the prefix.
_, valsFound := EnumerateKeysForPrefix(bi.db, prefixKey, false)
Expand Down Expand Up @@ -943,18 +913,11 @@ func (bc *Blockchain) GetBlockIndex() *BlockIndex {
return bc.blockIndex
}

// getAllBlockNodesIndexedAtHeight returns all block nodes at a given height from the block index.
func (bc *Blockchain) getAllBlockNodesIndexedAtHeight(blockHeight uint64) []*BlockNode {
return bc.blockIndex.GetBlockNodesByHeight(blockHeight)
}

// hasBlockNodesIndexedAtHeight returns whether or not there are block nodes at a given height in the block index.
func (bc *Blockchain) hasBlockNodesIndexedAtHeight(blockHeight uint64) bool {
// TODO: there's an optimization we can do here where we just check if the
// cache has any block nodes at the height instead of loading everything
// from the database as well.
blockNodes := bc.blockIndex.GetBlockNodesByHeight(blockHeight)
return len(blockNodes) > 0
prefix := _heightHashToNodePrefixByHeight(uint32(blockHeight), false)
keysFound := EnumeratePaginatedLimitedKeysForPrefix(bc.db, prefix, prefix, 1)
return len(keysFound) > 0
}

// IsFullyStored determines if there are block nodes that haven't been fully stored or processed in the best block chain.
Expand Down
14 changes: 14 additions & 0 deletions lib/db_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1585,6 +1585,20 @@ func _enumerateKeysOnlyForPrefixWithTxn(txn *badger.Txn, dbPrefix []byte) (_keys
return _enumeratePaginatedLimitedKeysForPrefixWithTxn(txn, dbPrefix, dbPrefix, math.MaxUint32)
}

func EnumeratePaginatedLimitedKeysForPrefix(
db *badger.DB,
dbPrefix []byte,
startKey []byte,
limit uint32,
) (_keysFound [][]byte) {
var keysFound [][]byte
_ = db.View(func(txn *badger.Txn) error {
keysFound = _enumeratePaginatedLimitedKeysForPrefixWithTxn(txn, dbPrefix, startKey, limit)
return nil
})
return keysFound
}

// _enumeratePaginatedLimitedKeysForPrefixWithTxn will look for keys in the db that are GREATER OR EQUAL to the startKey
// and satisfy the dbPrefix prefix. The total number of entries fetched will be EQUAL OR SMALLER than provided limit.
func _enumeratePaginatedLimitedKeysForPrefixWithTxn(txn *badger.Txn, dbPrefix []byte, startKey []byte, limit uint32) (_keysFound [][]byte) {
Expand Down
65 changes: 16 additions & 49 deletions lib/pos_blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,8 @@ func (bc *Blockchain) processHeaderPoS(header *MsgDeSoHeader, headerHash *BlockH

// If the incoming header is already part of the best header chain, then we can exit early.
// The header is not part of a fork, and is already an ancestor of the current header chain tip.
// Here we explicitly check the bestHeaderChain.ChainMap to make sure the in-memory struct is properly
// updated. This is necessary because the block index may have been updated with the header but the
// bestHeaderChain.ChainMap may not have been updated yet.
blockNode, isInBestHeaderChain, err := bc.GetBlockFromBestChainByHashAndOptionalHeight(headerHash, &header.Height, true)
blockNode, isInBestHeaderChain, err := bc.GetBlockFromBestChainByHashAndOptionalHeight(
headerHash, &header.Height, true)
if err != nil {
return nil, false, false,
errors.Wrapf(err, "processHeaderPoS: Problem getting block from best chain by hash: ")
Expand All @@ -75,13 +73,6 @@ func (bc *Blockchain) processHeaderPoS(header *MsgDeSoHeader, headerHash *BlockH
return blockNode, false, false, errors.Wrapf(err, "processHeaderPoS: Problem validating and indexing header: ")
}

// Don't worry about healing orphan children when we're syncing.
//if !bc.isSyncing() {
// // Now that we know we have a valid header, we check the block index for it any orphan children for it
// // and heal the parent pointers for all of them.
// bc.healPointersForOrphanChildren(blockNode)
//}

// Exit early if the header is an orphan.
if isOrphan {
return blockNode, false, true, nil
Expand All @@ -100,38 +91,6 @@ func (bc *Blockchain) processHeaderPoS(header *MsgDeSoHeader, headerHash *BlockH
return blockNode, true, false, nil
}

// healPointersForOrphanChildren fixes an inconsistency in the block index that may have
// occurred as a result of a node restart. In cases where we have an orphan node that we store in the
// DB, then on restart, that node's parent will not be in the block index. When processing the parent
// later on, we not only need to store the parent in the block index but also need to update the
// pointer from the orphan block's BlockNode to the parent. We do that dynamically here as we
// process headers.
//func (bc *Blockchain) healPointersForOrphanChildren(blockNode *BlockNode) {
// // Fetch all potential children of this blockNode from the block index.
// blockNodesAtNextHeight := bc.blockIndex.GetBlockNodesByHeight(blockNode.Header.Height + 1)
// exists := len(blockNodesAtNextHeight) > 0
// if !exists {
// // No children of this blockNode exist in the block index. Exit early.
// return
// }
//
// // Iterate through all block nodes at the next block height and update their parent pointers.
// for _, blockNodeAtNextHeight := range blockNodesAtNextHeight {
// // Check if it's a child of the parent block node.
// if !blockNodeAtNextHeight.Header.PrevBlockHash.IsEqual(blockNode.Hash) {
// continue
// }
//
// // Check if it has its parent pointer set. If it does, then we exit early.
// if blockNodeAtNextHeight.Parent != nil {
// continue
// }
//
// // If the parent block node is not set, then we set it to the parent block node.
// blockNodeAtNextHeight.Parent = blockNode
// }
//}

func (bc *Blockchain) validateAndIndexHeaderPoS(header *MsgDeSoHeader, headerHash *BlockHash, verifySignatures bool) (
_headerBlockNode *BlockNode, _isOrphan bool, _err error,
) {
Expand All @@ -155,8 +114,11 @@ func (bc *Blockchain) validateAndIndexHeaderPoS(header *MsgDeSoHeader, headerHas
}

// The header is an orphan. No need to store it in the block index. Exit early.
// TODO: validate that height - 1 > 0
parentBlockNode, parentBlockNodeExists := bc.blockIndex.GetBlockNodeByHashAndHeight(header.PrevBlockHash, header.Height-1)
if header.Height < 1 {
return nil, false, errors.New("validateAndIndexHeaderPoS: Header height is less than 1 - no valid parent height")
}
parentBlockNode, parentBlockNodeExists := bc.blockIndex.GetBlockNodeByHashAndHeight(
header.PrevBlockHash, header.Height-1)
if !parentBlockNodeExists {
return nil, true, nil
}
Expand Down Expand Up @@ -308,6 +270,9 @@ func (bc *Blockchain) processBlockPoS(block *MsgDeSoBlock, currentView uint64, v
"processBlockPoS: Unexpected problem getting lineage from committed tip: ")
}

if block.Header.Height < 1 {
return false, false, nil, errors.New("processBlockPoS: Block height is less than 1 - no valid parent height")
}
// We expect the utxoView for the parent block to be valid because we check that all ancestor blocks have
// been validated.
parentUtxoViewAndUtxoOps, err := bc.GetUtxoViewAndUtxoOpsAtBlockHash(*block.Header.PrevBlockHash, block.Header.Height-1)
Expand Down Expand Up @@ -685,7 +650,6 @@ func (bc *Blockchain) validateAndIndexBlockPoS(
) (*BlockNode, error) {

// Base case - Check if the block is validated or validate failed. If so, we can return early.
// TODO: validate height doesn't overflow uint32
blockNode, exists := bc.blockIndex.GetBlockNodeByHashAndHeight(blockHash, block.Header.Height)
if exists && (blockNode.IsValidateFailed() || blockNode.IsValidated()) {
return blockNode, nil
Expand Down Expand Up @@ -813,6 +777,9 @@ func (bc *Blockchain) validatePreviouslyIndexedBlockPoS(
// provided the block was cached in the block index and stored in the DB first.
return nil, errors.Wrapf(err, "validatePreviouslyIndexedBlockPoS: Problem fetching block from DB")
}
if block.Header.Height < 1 {
return nil, errors.New("processBlockPoS: Block height is less than 1 - no valid parent height")
}
// Build utxoView for the block's parent.
parentUtxoViewAndUtxoOps, err := bc.GetUtxoViewAndUtxoOpsAtBlockHash(*block.Header.PrevBlockHash, block.Header.Height-1)
if err != nil {
Expand Down Expand Up @@ -1659,7 +1626,6 @@ func (bc *Blockchain) addTipBlockToBestChain(blockNode *BlockNode) {
func (bc *Blockchain) removeTipBlockFromBestChain() *BlockNode {
// Remove the last block from the best chain.
lastBlock := bc.blockIndex.GetTip()
// Uhhh what happens if we don't have the parent set up!?
bc.blockIndex.setTip(lastBlock.GetParent(bc.blockIndex))
return lastBlock
}
Expand Down Expand Up @@ -2068,12 +2034,13 @@ func (bc *Blockchain) getSafeBlockNodes() ([]*BlockNode, error) {
safeBlocks := []*BlockNode{committedTip}
maxHeightWithSafeBlocks := bc.getMaxSequentialBlockHeightAfter(uint64(committedTip.Height))
for ii := uint64(committedTip.Height + 1); ii < maxHeightWithSafeBlocks+1; ii++ {
blockNodes := bc.blockIndex.GetBlockNodesByHeight(ii)
// If we don't have any blocks at this height, we know that any blocks at a later height are not safe blocks.
if !bc.hasBlockNodesIndexedAtHeight(ii) {
if len(blockNodes) == 0 {
break
}
hasSeenValidatedBlockAtThisHeight := false
blockNodes := bc.getAllBlockNodesIndexedAtHeight(ii)

for _, blockNode := range blockNodes {
// TODO: Are there other conditions we should consider?
if blockNode.IsValidated() {
Expand Down
3 changes: 0 additions & 3 deletions lib/pos_blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,6 @@ func TestUpsertBlockAndBlockNodeToDB(t *testing.T) {
require.Len(t, byHeightBlockNodes, 1)
require.True(t, byHeightBlockNodes[0].Hash.IsEqual(newHash))
require.True(t, bc.hasBlockNodesIndexedAtHeight(2))
require.Len(t, bc.getAllBlockNodesIndexedAtHeight(2), 1)
// Check the DB for the block
uncommittedBlock, err := GetBlock(newHash, bc.db, bc.snapshot)
require.NoError(t, err)
Expand All @@ -414,7 +413,6 @@ func TestUpsertBlockAndBlockNodeToDB(t *testing.T) {
require.True(t, byHeightBlockNodes[0].Hash.IsEqual(newHash))
require.True(t, byHeightBlockNodes[0].IsValidated())
require.True(t, bc.hasBlockNodesIndexedAtHeight(2))
require.Len(t, bc.getAllBlockNodesIndexedAtHeight(2), 1)

// Okay now we'll put in another block at the same height.
// Update the random seed hash so we have a new hash for the block.
Expand All @@ -441,7 +439,6 @@ func TestUpsertBlockAndBlockNodeToDB(t *testing.T) {
require.True(t, byHeightBlockNodes[0].Hash.IsEqual(newHash) || byHeightBlockNodes[1].Hash.IsEqual(newHash))
require.True(t, byHeightBlockNodes[0].Hash.IsEqual(updatedBlockHash) || byHeightBlockNodes[1].Hash.IsEqual(updatedBlockHash))
require.True(t, bc.hasBlockNodesIndexedAtHeight(2))
require.Len(t, bc.getAllBlockNodesIndexedAtHeight(2), 2)

// If we're missing a field in the header, we should get an error
// as we can't compute the hash.
Expand Down
34 changes: 8 additions & 26 deletions lib/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ func ValidateHyperSyncFlags(isHypersync bool, syncType NodeSyncType) {
}
}

// RunBlockIndexMigrationOnce runs the block index migration once and saves a file to
// indicate that it has been run.
func RunBlockIndexMigrationOnce(db *badger.DB, params *DeSoParams) error {
blockIndexMigrationFileName := filepath.Join(db.Opts().Dir, BlockIndexMigrationFileName)
glog.V(2).Info("FileName: ", blockIndexMigrationFileName)
Expand Down Expand Up @@ -771,16 +773,6 @@ func (srv *Server) _handleGetHeaders(pp *Peer, msg *MsgDeSoGetHeaders) {
if pp.NegotiatedProtocolVersion >= ProtocolVersion2 {
maxHeadersPerMsg = MaxHeadersPerMsgPos
}
// FIXME: We can eliminate the call to LocateBestBlockChainHeaders and do a much
// simpler "shortcut" version that doesn't require complicated tree-traversal bs.
// The shortcut would be to just return all headers starting from msg.BlockLocator[0]
// up to msg.StopHash or maxHeadersPerMsg, whichever comes first. This would allow
// other nodes to sync from us and *keep* in sync with us, while allowing us to delete
// ALL of the complicated logic around locators and the best header chain. This all works
// because msg.BlockLocator[0] is the requesting-node's tip hash. The rest of the
// hashes, and all of the locator bs, are only needed to resolve forks, which can't
// happen with PoS anymore.
//headers := srv.blockchain.LocateBestBlockChainHeaders(msg.BlockLocator, msg.StopHash, maxHeadersPerMsg)

headers, err := srv.GetHeadersForLocatorAndStopHash(msg.BlockLocator, msg.StopHash, maxHeadersPerMsg)
if err != nil {
Expand All @@ -799,6 +791,8 @@ func (srv *Server) _handleGetHeaders(pp *Peer, msg *MsgDeSoGetHeaders) {
headers, blockTip.Hash, blockTip.Height, pp)
}

// GetHeadersForLocatorAndStopHash returns a list of headers given a list of locator block hashes
// and a stop hash. Note that this may be slow if the block nodes requested are not in the cache.
func (srv *Server) GetHeadersForLocatorAndStopHash(
locator []*BlockHash,
stopHash *BlockHash,
Expand Down Expand Up @@ -1214,7 +1208,6 @@ func (srv *Server) _handleHeaderBundle(pp *Peer, msg *MsgDeSoHeaderBundle) {

// If we get here then we have a header we haven't seen before.
// check if we need to verify signatures
// TODO: we can add some logic into should verify signatures to avoid trying to get the checkpoint block node.
verifySignatures, shouldDisconnect := srv.shouldVerifySignatures(headerReceived, true)
if shouldDisconnect {
glog.Errorf("Server._handleHeaderBundle: Disconnecting peer %v in state %s because a mismatch was "+
Expand Down Expand Up @@ -1489,7 +1482,6 @@ func (srv *Server) _handleHeaderBundle(pp *Peer, msg *MsgDeSoHeaderBundle) {
glog.V(1).Infof("Server._handleHeaderBundle: *Syncing* headers for blocks starting at "+
"header tip %v out of %d from peer %v",
headerTip.Header, msg.TipHeight, pp)
// TODO: this may be wrong?
glog.V(0).Infof("Server._handleHeaderBundle: Num Headers in header chain: (header tip height: %v) ",
srv.blockchain.blockIndex.GetHeaderTip())
}
Expand Down Expand Up @@ -1781,7 +1773,7 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) {
}
if !exists {
glog.Errorf("Server._handleSnapshot: Problem getting block node by height, block node does not exist: (%v)", msg.SnapshotMetadata.SnapshotBlockHeight)
//return
return
} else {
glog.Infof(CLog(Yellow, fmt.Sprintf("Best header chain %v best block chain %v",
blockNode, srv.blockchain.blockIndex.GetTip())))
Expand Down Expand Up @@ -1836,21 +1828,11 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) {
}()
// acquire the chain lock while we update the best chain and best chain map.
srv.blockchain.ChainLock.Lock()
// TODO: we should iterate in reverse so we can use GetBlockFromBestChainByHashAndOptionalHeight
// by doing currentNode.Height - 1 and currentNode.Header.PrevBlockHash.
currentNode, currentNodeExists, err := srv.blockchain.GetBlockFromBestChainByHeight(srv.HyperSyncProgress.SnapshotMetadata.SnapshotBlockHeight, true)
if err != nil {
glog.Errorf("Server._handleSnapshot: Problem getting block node by height, error: (%v)", err)
// TODO: should we return here?
}
if !currentNodeExists {
glog.Errorf("Server._handleSnapshot: Problem getting block node by height, block node does not exist")
// TODO: should we return here?
}
currentNode := blockNode
currentNodeExists := true
// Set the block tip to the snapshot height block node.
srv.blockchain.blockIndex.setTip(currentNode)
for currentNode.Height > 0 {
//for ii := uint64(1); ii <= srv.HyperSyncProgress.SnapshotMetadata.SnapshotBlockHeight; ii++ {
// Do not set the StatusBlockStored flag, because we still need to download the past blocks.
currentNode.Status |= StatusBlockProcessed
currentNode.Status |= StatusBlockValidated
Expand All @@ -1876,7 +1858,7 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) {
break
}
// TODO: should we adjust this value for batch sizes?
if len(blockNodeBatch) < 10000 {
if len(blockNodeBatch) < 25000 {
continue
}
err = PutHeightHashToNodeInfoBatch(srv.blockchain.db, srv.snapshot, blockNodeBatch, false /*bitcoinNodes*/, srv.eventManager)
Expand Down

0 comments on commit 8a0bf0f

Please sign in to comment.