Skip to content

Commit

Permalink
Port features from horizon-v2.28.2 into master (stellar#5210)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaptic authored Feb 15, 2024
2 parents 73de95c + 531a01f commit 1b0aa07
Show file tree
Hide file tree
Showing 19 changed files with 539 additions and 80 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2
github.com/aws/aws-sdk-go v1.45.26
github.com/creachadair/jrpc2 v1.1.0
github.com/djherbis/fscache v0.10.1
github.com/elazarl/go-bindata-assetfs v1.0.1
github.com/getsentry/raven-go v0.2.0
github.com/go-chi/chi v4.1.2+incompatible
Expand Down Expand Up @@ -91,6 +92,8 @@ require (
golang.org/x/tools v0.14.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231211222908-989df2bf70f3 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 // indirect
gopkg.in/djherbis/atime.v1 v1.0.0 // indirect
gopkg.in/djherbis/stream.v1 v1.3.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)

Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/djherbis/fscache v0.10.1 h1:hDv+RGyvD+UDKyRYuLoVNbuRTnf2SrA2K3VyR1br9lk=
github.com/djherbis/fscache v0.10.1/go.mod h1:yyPYtkNnnPXsW+81lAcQS6yab3G2CRfnPLotBvtbf0c=
github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
Expand Down Expand Up @@ -814,6 +816,10 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/djherbis/atime.v1 v1.0.0 h1:eMRqB/JrLKocla2PBPKgQYg/p5UG4L6AUAs92aP7F60=
gopkg.in/djherbis/atime.v1 v1.0.0/go.mod h1:hQIUStKmJfvf7xdh/wtK84qe+DsTV5LnA9lzxxtPpJ8=
gopkg.in/djherbis/stream.v1 v1.3.1 h1:uGfmsOY1qqMjQQphhRBSGLyA9qumJ56exkRu9ASTjCw=
gopkg.in/djherbis/stream.v1 v1.3.1/go.mod h1:aEV8CBVRmSpLamVJfM903Npic1IKmb2qS30VAZ+sssg=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/gavv/httpexpect.v1 v1.0.0-20170111145843-40724cf1e4a0 h1:r5ptJ1tBxVAeqw4CrYWhXIMr0SybY3CDHuIbCg5CFVw=
Expand Down
130 changes: 108 additions & 22 deletions historyarchive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ import (
"fmt"
"io"
"net/url"
"os"
"path"
"regexp"
"strconv"
"strings"
"sync"
"time"

fscache "github.com/djherbis/fscache"
log "github.com/sirupsen/logrus"

"github.com/stellar/go/support/errors"
Expand All @@ -38,16 +41,17 @@ type CommandOptions struct {
}

type ArchiveOptions struct {
storage.ConnectOptions

// NetworkPassphrase defines the expected network of history archive. It is
// checked when getting HAS. If network passphrase does not match, error is
// returned.
NetworkPassphrase string
// CheckpointFrequency is the number of ledgers between checkpoints
// if unset, DefaultCheckpointFrequency will be used
CheckpointFrequency uint32
storage.ConnectOptions
// CacheConfig controls how/if bucket files are cached on the disk.
CacheConfig CacheOptions
// CachePath controls where/if bucket files are cached on the disk.
CachePath string
}

type Ledger struct {
Expand Down Expand Up @@ -104,8 +108,15 @@ type Archive struct {
checkpointManager CheckpointManager

backend storage.Storage
cache *ArchiveBucketCache
stats archiveStats
cache *archiveBucketCache
}

type archiveBucketCache struct {
fscache.Cache

path string
sizes sync.Map
}

func (arch *Archive) GetStats() []ArchiveStats {
Expand Down Expand Up @@ -383,23 +394,79 @@ func (a *Archive) GetXdrStream(pth string) (*XdrStream, error) {
}

func (a *Archive) cachedGet(pth string) (io.ReadCloser, error) {
if a.cache != nil {
rdr, foundInCache, err := a.cache.GetFile(pth, a.backend)
if !foundInCache {
a.stats.incrementDownloads()
} else {
a.stats.incrementCacheHits()
}
if err == nil {
return rdr, nil
if a.cache == nil {
a.stats.incrementDownloads()
return a.backend.GetFile(pth)
}

L := log.WithField("path", pth).WithField("cache", a.cache.path)

rdr, wrtr, err := a.cache.Get(pth)
if err != nil {
L.WithError(err).
WithField("remove", a.cache.Remove(pth)).
Warn("On-disk cache retrieval failed")
a.stats.incrementDownloads()
return a.backend.GetFile(pth)
}

// If a NEW key is being retrieved, it returns a writer to which
// you're expected to write your upstream as well as a reader that
// will read directly from it.
if wrtr != nil {
log.WithField("path", pth).Info("Caching file...")
a.stats.incrementDownloads()
upstreamReader, err := a.backend.GetFile(pth)
if err != nil {
writeErr := wrtr.Close()
readErr := rdr.Close()
removeErr := a.cache.Remove(pth)
// Execution order isn't guaranteed w/in a function call expression
// so we close them with explicit order first.
L.WithError(err).WithFields(log.Fields{
"write-close": writeErr,
"read-close": readErr,
"cache-rm": removeErr,
}).Warn("Download failed, purging from cache")
return nil, err
}

// If there's an error, retry with the uncached backend.
a.cache.Evict(pth)
// Start a goroutine to slurp up the upstream and feed
// it directly to the cache.
go func() {
written, err := io.Copy(wrtr, upstreamReader)
writeErr := wrtr.Close()
readErr := upstreamReader.Close()
fields := log.Fields{
"wr-close": writeErr,
"rd-close": readErr,
}

if err != nil {
L.WithFields(fields).WithError(err).
Warn("Failed to download and cache file")

// Removal must happen *after* handles close.
if removalErr := a.cache.Remove(pth); removalErr != nil {
L.WithError(removalErr).Warn("Removing cached file failed")
}
} else {
L.WithFields(fields).Infof("Cached %dKiB file", written/1024)

// Track how much bandwidth we've saved from caching by saving
// the size of the file we just downloaded.
a.cache.sizes.Store(pth, written)
}
}()
} else {
// Best-effort check to track bandwidth metrics
if written, found := a.cache.sizes.Load(pth); found {
a.stats.incrementCacheBandwidth(written.(int64))
}
a.stats.incrementCacheHits()
}

a.stats.incrementDownloads()
return a.backend.GetFile(pth)
return rdr, nil
}

func (a *Archive) cachedExists(pth string) (bool, error) {
Expand Down Expand Up @@ -439,13 +506,30 @@ func Connect(u string, opts ArchiveOptions) (*Archive, error) {
return &arch, err
}

if opts.CacheConfig.Cache {
cache, innerErr := MakeArchiveBucketCache(opts.CacheConfig)
if innerErr != nil {
return &arch, innerErr
if opts.CachePath != "" {
// Set up a <= ~10GiB LRU cache for history archives files
haunter := fscache.NewLRUHaunterStrategy(
fscache.NewLRUHaunter(0, 10<<30, time.Minute /* frequency check */),
)

// Wipe any existing cache on startup
os.RemoveAll(opts.CachePath)
fs, err := fscache.NewFs(opts.CachePath, 0755 /* drwxr-xr-x */)

if err != nil {
return &arch, errors.Wrapf(err,
"creating cache at '%s' with mode 0755 failed",
opts.CachePath)
}

cache, err := fscache.NewCacheWithHaunter(fs, haunter)
if err != nil {
return &arch, errors.Wrapf(err,
"creating cache at '%s' failed",
opts.CachePath)
}

arch.cache = cache
arch.cache = &archiveBucketCache{cache, opts.CachePath, sync.Map{}}
}

arch.stats = archiveStats{backendName: u}
Expand All @@ -467,6 +551,8 @@ func ConnectBackend(u string, opts storage.ConnectOptions) (storage.Storage, err

if parsed.Scheme == "mock" {
backend = makeMockBackend()
} else if parsed.Scheme == "fmock" {
backend = makeFailingMockBackend()
} else {
backend, err = storage.ConnectBackend(u, opts)
}
Expand Down
137 changes: 98 additions & 39 deletions historyarchive/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"

"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var cachePath = filepath.Join(os.TempDir(), "history-archive-test-cache")

func GetTestS3Archive() *Archive {
mx := big.NewInt(0xffffffff)
r, e := rand.Int(rand.Reader, mx)
Expand All @@ -49,13 +54,10 @@ func GetTestS3Archive() *Archive {
}

func GetTestMockArchive() *Archive {
return MustConnect("mock://test",
ArchiveOptions{CheckpointFrequency: 64,
CacheConfig: CacheOptions{
Cache: true,
Path: filepath.Join(os.TempDir(), "history-archive-test-cache"),
MaxFiles: 5,
}})
return MustConnect("mock://test", ArchiveOptions{
CheckpointFrequency: 64,
CachePath: cachePath,
})
}

var tmpdirs []string
Expand Down Expand Up @@ -563,7 +565,95 @@ func TestGetLedgers(t *testing.T) {
assert.Equal(t, uint32(1), archive.GetStats()[0].GetRequests())
assert.Equal(t, uint32(0), archive.GetStats()[0].GetDownloads())
assert.EqualError(t, err, "checkpoint 1023 is not published")
ledgerHeaders, transactions, results := makeFakeArchive(t, archive)

stats := archive.GetStats()[0]
ledgers, err := archive.GetLedgers(1000, 1002)

assert.NoError(t, err)
assert.Len(t, ledgers, 3)
// it started at 1, incurred 6 requests total: 3 queries + 3 downloads
assert.EqualValues(t, 7, stats.GetRequests())
// started 0, incurred 3 file downloads
assert.EqualValues(t, 3, stats.GetDownloads())
assert.EqualValues(t, 0, stats.GetCacheHits())
for i, seq := range []uint32{1000, 1001, 1002} {
ledger := ledgers[seq]
assertXdrEquals(t, ledgerHeaders[i], ledger.Header)
assertXdrEquals(t, transactions[i], ledger.Transaction)
assertXdrEquals(t, results[i], ledger.TransactionResult)
}

// Repeat the same check but ensure the cache was used
ledgers, err = archive.GetLedgers(1000, 1002) // all cached
assert.NoError(t, err)
assert.Len(t, ledgers, 3)

// downloads should not change because of the cache
assert.EqualValues(t, 3, stats.GetDownloads())
// but requests increase because of 3 fetches to categories
assert.EqualValues(t, 10, stats.GetRequests())
assert.EqualValues(t, 3, stats.GetCacheHits())
for i, seq := range []uint32{1000, 1001, 1002} {
ledger := ledgers[seq]
assertXdrEquals(t, ledgerHeaders[i], ledger.Header)
assertXdrEquals(t, transactions[i], ledger.Transaction)
assertXdrEquals(t, results[i], ledger.TransactionResult)
}

// remove the cached files without informing it and ensure it fills up again
require.NoError(t, os.RemoveAll(cachePath))
ledgers, err = archive.GetLedgers(1000, 1002) // uncached, refetch
assert.NoError(t, err)
assert.Len(t, ledgers, 3)

// downloads should increase again
assert.EqualValues(t, 6, stats.GetDownloads())
assert.EqualValues(t, 3, stats.GetCacheHits())
}

func TestStressfulGetLedgers(t *testing.T) {
archive := GetTestMockArchive()
ledgerHeaders, transactions, results := makeFakeArchive(t, archive)

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)

go func() {
time.Sleep(time.Millisecond) // encourage interleaved execution
ledgers, err := archive.GetLedgers(1000, 1002)
assert.NoError(t, err)
assert.Len(t, ledgers, 3)
for i, seq := range []uint32{1000, 1001, 1002} {
ledger := ledgers[seq]
assertXdrEquals(t, ledgerHeaders[i], ledger.Header)
assertXdrEquals(t, transactions[i], ledger.Transaction)
assertXdrEquals(t, results[i], ledger.TransactionResult)
}

wg.Done()
}()
}

require.Eventually(t, func() bool { wg.Wait(); return true }, time.Minute, time.Second)
}

func TestCacheDeadlocks(t *testing.T) {
archive := MustConnect("fmock://test", ArchiveOptions{
CheckpointFrequency: 64,
CachePath: cachePath,
})
makeFakeArchive(t, archive)
_, err := archive.GetLedgers(1000, 1002)
require.Error(t, err)
}

func makeFakeArchive(t *testing.T, archive *Archive) (
[]xdr.LedgerHeaderHistoryEntry,
[]xdr.TransactionHistoryEntry,
[]xdr.TransactionHistoryResultEntry,
) {
ledgerHeaders := []xdr.LedgerHeaderHistoryEntry{
{
Hash: xdr.Hash{1},
Expand Down Expand Up @@ -646,36 +736,5 @@ func TestGetLedgers(t *testing.T) {
[]xdrEntry{results[0], results[1], results[2]},
)

stats := archive.GetStats()[0]
ledgers, err := archive.GetLedgers(1000, 1002)

assert.NoError(t, err)
assert.Len(t, ledgers, 3)
// it started at 1, incurred 6 requests total, 3 queries, 3 downloads
assert.EqualValues(t, 7, stats.GetRequests())
// started 0, incurred 3 file downloads
assert.EqualValues(t, 3, stats.GetDownloads())
for i, seq := range []uint32{1000, 1001, 1002} {
ledger := ledgers[seq]
assertXdrEquals(t, ledgerHeaders[i], ledger.Header)
assertXdrEquals(t, transactions[i], ledger.Transaction)
assertXdrEquals(t, results[i], ledger.TransactionResult)
}

// Repeat the same check but ensure the cache was used
ledgers, err = archive.GetLedgers(1000, 1002) // all cached
assert.NoError(t, err)
assert.Len(t, ledgers, 3)

// downloads should not change because of the cache
assert.EqualValues(t, 3, stats.GetDownloads())
// but requests increase because of 3 fetches to categories
assert.EqualValues(t, 10, stats.GetRequests())
assert.EqualValues(t, 3, stats.GetCacheHits())
for i, seq := range []uint32{1000, 1001, 1002} {
ledger := ledgers[seq]
assertXdrEquals(t, ledgerHeaders[i], ledger.Header)
assertXdrEquals(t, transactions[i], ledger.Transaction)
assertXdrEquals(t, results[i], ledger.TransactionResult)
}
return ledgerHeaders, transactions, results
}
Loading

0 comments on commit 1b0aa07

Please sign in to comment.