Skip to content

Commit

Permalink
review comments, add reorg unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
Larry Ruane committed Dec 19, 2019
1 parent 74b9cba commit 66e22da
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 24 deletions.
2 changes: 1 addition & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func main() {
flag.StringVar(&opts.zcashConfPath, "conf-file", "./zcash.conf", "conf file to pull RPC creds from")
flag.BoolVar(&opts.veryInsecure, "no-tls-very-insecure", false, "run without the required TLS certificate, only for debugging, DO NOT use in production")
flag.BoolVar(&opts.wantVersion, "version", false, "version (major.minor.patch)")
flag.IntVar(&opts.cacheSize, "cache-size", 40000, "number of blocks to hold in the cache")
flag.IntVar(&opts.cacheSize, "cache-size", 80000, "number of blocks to hold in the cache")

// TODO prod metrics
// TODO support config from file and env vars
Expand Down
62 changes: 40 additions & 22 deletions common/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,53 @@ type BlockCacheEntry struct {
type BlockCache struct {
MaxEntries int

FirstBlock int
LastBlock int

m map[int]*BlockCacheEntry
// m[firstBlock..nextBlock) are valid
m map[int]*BlockCacheEntry
firstBlock int
nextBlock int

mutex sync.RWMutex
}

func NewBlockCache(maxEntries int) *BlockCache {
return &BlockCache{
MaxEntries: maxEntries,
FirstBlock: -1,
LastBlock: -1,
m: make(map[int]*BlockCacheEntry),
}
}

func (c *BlockCache) Add(height int, block *walletrpc.CompactBlock) (error, bool) {
// Invariant: m[firstBlock..nextBlock) are valid.
c.mutex.Lock()
defer c.mutex.Unlock()

// If we already have this block or any higher blocks, a reorg
// must have occurred; these must be re-added
for i := height; i <= c.LastBlock; i++ {
if height > c.nextBlock {
// restarting the cache (never happens currently), or first time
for i := c.firstBlock; i < c.nextBlock; i++ {
delete(c.m, i)
}
c.firstBlock = height
c.nextBlock = height
}
// Invariant: m[firstBlock..nextBlock) are valid.

// If we already have this block, a reorg must have occurred;
// this block (and all higher) must be re-added.
h := height
if h < c.firstBlock {
h = c.firstBlock
}
for i := h; i < c.nextBlock; i++ {
delete(c.m, i)
}
c.nextBlock = height
if c.firstBlock > c.nextBlock {
c.firstBlock = c.nextBlock
}
// Invariant: m[firstBlock..nextBlock) are valid.

// Detect reorg, ingestor needs to handle it
if c.m[height-1] != nil && !bytes.Equal(block.PrevHash, c.m[height-1].hash) {
if height > c.firstBlock && !bytes.Equal(block.PrevHash, c.m[height-1].hash) {
return nil, true
}

Expand All @@ -54,22 +72,20 @@ func (c *BlockCache) Add(height int, block *walletrpc.CompactBlock) (error, bool
println("Error marshalling block!")
return err, false
}

c.m[height] = &BlockCacheEntry{
data: data,
hash: block.GetHash(),
}

c.LastBlock = height
if c.FirstBlock < 0 || c.FirstBlock > height {
c.FirstBlock = height
}
c.nextBlock++
// Invariant: m[firstBlock..nextBlock) are valid.

// remove any blocks that are older than the capacity of the cache
for c.FirstBlock <= c.LastBlock-c.MaxEntries {
delete(c.m, c.FirstBlock)
c.FirstBlock++
for c.firstBlock < c.nextBlock-c.MaxEntries {
// Invariant: m[firstBlock..nextBlock) are valid.
delete(c.m, c.firstBlock)
c.firstBlock++
}
// Invariant: m[firstBlock..nextBlock) are valid.

return nil, false
}
Expand All @@ -78,7 +94,7 @@ func (c *BlockCache) Get(height int) *walletrpc.CompactBlock {
c.mutex.RLock()
defer c.mutex.RUnlock()

if c.m[height] == nil {
if height < c.firstBlock || height >= c.nextBlock {
return nil
}

Expand All @@ -95,6 +111,8 @@ func (c *BlockCache) Get(height int) *walletrpc.CompactBlock {
func (c *BlockCache) GetLatestBlock() int {
c.mutex.RLock()
defer c.mutex.RUnlock()

return c.LastBlock
if c.firstBlock == c.nextBlock {
return -1
}
return c.nextBlock - 1
}
248 changes: 248 additions & 0 deletions common/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
package common

import (
"encoding/hex"
"encoding/json"
"fmt"
"io/ioutil"
"testing"

"github.com/pkg/errors"

"github.com/zcash-hackworks/lightwalletd/parser"
"github.com/zcash-hackworks/lightwalletd/walletrpc"
)

func TestCache(t *testing.T) {
type compactTest struct {
BlockHeight int `json:"block"`
BlockHash string `json:"hash"`
PrevHash string `json:"prev"`
Full string `json:"full"`
Compact string `json:"compact"`
}
var compactTests []compactTest
var compacts []*walletrpc.CompactBlock

blockJSON, err := ioutil.ReadFile("../testdata/compact_blocks.json")
if err != nil {
t.Fatal(err)
}

err = json.Unmarshal(blockJSON, &compactTests)
if err != nil {
t.Fatal(err)
}
cache := NewBlockCache(4)

// derive compact blocks from file data (setup, not part of the test)
for _, test := range compactTests {
blockData, _ := hex.DecodeString(test.Full)
block := parser.NewBlock()
blockData, err = block.ParseFromSlice(blockData)
if err != nil {
t.Error(errors.Wrap(err, fmt.Sprintf("parsing testnet block %d", test.BlockHeight)))
continue
}
compacts = append(compacts, block.ToCompact())
}

// initially empty cache
if cache.GetLatestBlock() != -1 {
t.Fatal("reorg, unexpected GetLatestBlock")
}

// normal, sunny-day case, 6 blocks, add as blocks 10-15
for i, compact := range compacts {
err, reorg := cache.Add(10+i, compact)
if err != nil {
t.Fatal(err)
}
if reorg {
t.Fatal("unexpected reorg")
}
if cache.GetLatestBlock() != 10+i {
t.Fatal("unexpected GetLatestBlock")
}
// The test blocks start at height 289460
if int(cache.Get(10+i).Height) != 289460+i {
t.Fatal("unexpected block contents")
}
}
if len(cache.m) != 4 { // max entries is 4
t.Fatal("unexpected number of cache entries:", len(cache.m))
}
if cache.firstBlock != 16-4 {
t.Fatal("unexpected firstBlock:", cache.firstBlock)
}
if cache.nextBlock != 16 {
t.Fatal("unexpected nextBlock:", cache.nextBlock)
}

// No entries just before and just after the cache range
if cache.Get(11) != nil || cache.Get(16) != nil {
t.Fatal("unexpected Get:", cache.nextBlock)
}

// We can re-add the last block (with the same data) and
// that should just replace and not be considered a reorg
err, reorg := cache.Add(15, compacts[5])
if err != nil {
t.Fatal(err)
}
if reorg {
t.Fatal("replace, unexpected reorg")
}
if len(cache.m) != 4 {
t.Fatal("replace, unexpected number of blocks:", len(cache.m))
}
if cache.firstBlock != 16-4 {
t.Fatal("replace, unexpected firstBlock:", cache.firstBlock)
}
if cache.nextBlock != 16 {
t.Fatal("replace, unexpected nextBlock:", cache.nextBlock)
}

// Simulate a reorg by resubmitting as the next block, 16, any block with
// the wrote prev-hash (let's use the first, just because it's handy)
err, reorg = cache.Add(16, compacts[0])
if err != nil {
t.Fatal(err)
}
if !reorg {
t.Fatal("unexpected non-reorg")
}
// The cache shouldn't have changed in any way
if cache.Get(16) != nil {
t.Fatal("reorg, unexpected block 16 exists")
}
if cache.GetLatestBlock() != 15 {
t.Fatal("reorg, unexpected GetLatestBlock")
}
if int(cache.Get(15).Height) != 289460+5 {
t.Fatal("reorg, unexpected Get")
}
if len(cache.m) != 4 {
t.Fatal("reorg, unexpected number of cache entries")
}

// In response to the reorg being detected, we must back up until we
// reach a block that's before the reorg (where the chain split).
// Let's back up one block, to height 15, request it from zcashd,
// but let's say this block is from the new branch, so we haven't
// gone back far enough, so this will still be disallowed.
err, reorg = cache.Add(15, compacts[0])
if err != nil {
t.Fatal(err)
}
if !reorg {
t.Fatal("unexpected non-reorg")
}
// the cache deleted block 15 (it's definitely wrong)
if cache.Get(15) != nil {
t.Fatal("reorg, unexpected block 15 exists")
}
if cache.GetLatestBlock() != 14 {
t.Fatal("reorg, unexpected GetLatestBlock")
}
if int(cache.Get(14).Height) != 289460+4 {
t.Fatal("reorg, unexpected Get")
}
// now only 3 entries (12-14)
if len(cache.m) != 3 {
t.Fatal("reorg, unexpected number of cache entries")
}

// Back up a couple more, try to re-add height 13, and suppose
// that's before the split (for example, there were two 14s).
// (In this test, we're replacing 13 with the same block; in
// real life, we'd be replacing it with a different version of
// 13 that has the same prev-hash).
err, reorg = cache.Add(13, compacts[3])
if err != nil {
t.Fatal(err)
}
if reorg {
t.Fatal("unexpected reorg")
}
// 13 was replaced (with the same block), but that means
// everything after 13 is deleted
if cache.Get(14) != nil {
t.Fatal("reorg, unexpected block 14 exists")
}
if cache.GetLatestBlock() != 13 {
t.Fatal("reorg, unexpected GetLatestBlock")
}
if int(cache.Get(13).Height) != 289460+3 {
t.Fatal("reorg, unexpected Get")
}
if int(cache.Get(12).Height) != 289460+2 {
t.Fatal("reorg, unexpected Get")
}
// down to 2 entries (12-13)
if len(cache.m) != 2 {
t.Fatal("reorg, unexpected number of cache entries")
}

// Now we can continue forward from here
err, reorg = cache.Add(14, compacts[4])
if err != nil {
t.Fatal(err)
}
if reorg {
t.Fatal("unexpected reorg")
}
if cache.GetLatestBlock() != 14 {
t.Fatal("reorg, unexpected GetLatestBlock")
}
if int(cache.Get(14).Height) != 289460+4 {
t.Fatal("reorg, unexpected Get")
}
if len(cache.m) != 3 {
t.Fatal("reorg, unexpected number of cache entries")
}

// It's possible, although unlikely, that after a reorg is detected,
// we back up so much that we're before the start of the cache
// (especially if the cache is very small). This should remove the
// entire cache before adding the new entry.
if cache.firstBlock != 12 {
t.Fatal("unexpected firstBlock")
}
err, reorg = cache.Add(10, compacts[0])
if err != nil {
t.Fatal(err)
}
if reorg {
t.Fatal("unexpected reorg")
}
if cache.GetLatestBlock() != 10 {
t.Fatal("reorg, unexpected GetLatestBlock")
}
if int(cache.Get(10).Height) != 289460+0 {
t.Fatal("reorg, unexpected Get")
}
if len(cache.m) != 1 {
t.Fatal("reorg, unexpected number of cache entries")
}

// Another weird case (not currently possible) is adding a block at
// a height that is not one higher than the current latest block.
// This should remove the entire cache before adding the new entry.
err, reorg = cache.Add(20, compacts[0])
if err != nil {
t.Fatal(err)
}
if reorg {
t.Fatal("unexpected reorg")
}
if cache.GetLatestBlock() != 20 {
t.Fatal("reorg, unexpected GetLatestBlock")
}
if int(cache.Get(20).Height) != 289460 {
t.Fatal("reorg, unexpected Get")
}
if len(cache.m) != 1 {
t.Fatal("reorg, unexpected number of cache entries")
}
}
4 changes: 3 additions & 1 deletion common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ func BlockIngestor(rpcClient *rpcclient.Client, cache *BlockCache, log *logrus.E

// Check for reorgs once we have inital block hash from startup
if reorg {
height -= 10
// This must back up at least 1, but it's arbitrary, any value
// will work; this is probably a good balance.
height -= 2
reorgCount++
if reorgCount > 10 {
log.Fatal("Reorg exceeded max of 100 blocks! Help!")
Expand Down

0 comments on commit 66e22da

Please sign in to comment.