diff --git a/go.mod b/go.mod index f3d11b5abc..1ca1ea801c 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 github.com/aws/aws-sdk-go v1.45.26 github.com/creachadair/jrpc2 v1.1.0 + github.com/djherbis/fscache v0.10.1 github.com/elazarl/go-bindata-assetfs v1.0.1 github.com/getsentry/raven-go v0.2.0 github.com/go-chi/chi v4.1.2+incompatible @@ -91,6 +92,8 @@ require ( golang.org/x/tools v0.14.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20231211222908-989df2bf70f3 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 // indirect + gopkg.in/djherbis/atime.v1 v1.0.0 // indirect + gopkg.in/djherbis/stream.v1 v1.3.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) diff --git a/go.sum b/go.sum index 9ca895487d..6e4158f9b7 100644 --- a/go.sum +++ b/go.sum @@ -104,6 +104,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/djherbis/fscache v0.10.1 h1:hDv+RGyvD+UDKyRYuLoVNbuRTnf2SrA2K3VyR1br9lk= +github.com/djherbis/fscache v0.10.1/go.mod h1:yyPYtkNnnPXsW+81lAcQS6yab3G2CRfnPLotBvtbf0c= github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= @@ -814,6 +816,10 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/djherbis/atime.v1 v1.0.0 h1:eMRqB/JrLKocla2PBPKgQYg/p5UG4L6AUAs92aP7F60= +gopkg.in/djherbis/atime.v1 v1.0.0/go.mod h1:hQIUStKmJfvf7xdh/wtK84qe+DsTV5LnA9lzxxtPpJ8= +gopkg.in/djherbis/stream.v1 v1.3.1 h1:uGfmsOY1qqMjQQphhRBSGLyA9qumJ56exkRu9ASTjCw= +gopkg.in/djherbis/stream.v1 v1.3.1/go.mod h1:aEV8CBVRmSpLamVJfM903Npic1IKmb2qS30VAZ+sssg= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gavv/httpexpect.v1 v1.0.0-20170111145843-40724cf1e4a0 h1:r5ptJ1tBxVAeqw4CrYWhXIMr0SybY3CDHuIbCg5CFVw= diff --git a/historyarchive/archive.go b/historyarchive/archive.go index ed05a4130d..d52c41ec43 100644 --- a/historyarchive/archive.go +++ b/historyarchive/archive.go @@ -11,12 +11,15 @@ import ( "fmt" "io" "net/url" + "os" "path" "regexp" "strconv" "strings" "sync" + "time" + fscache "github.com/djherbis/fscache" log "github.com/sirupsen/logrus" "github.com/stellar/go/support/errors" @@ -38,6 +41,8 @@ type CommandOptions struct { } type ArchiveOptions struct { + storage.ConnectOptions + // NetworkPassphrase defines the expected network of history archive. It is // checked when getting HAS. If network passphrase does not match, error is // returned. @@ -45,9 +50,8 @@ type ArchiveOptions struct { // CheckpointFrequency is the number of ledgers between checkpoints // if unset, DefaultCheckpointFrequency will be used CheckpointFrequency uint32 - storage.ConnectOptions - // CacheConfig controls how/if bucket files are cached on the disk. - CacheConfig CacheOptions + // CachePath controls where/if bucket files are cached on the disk. + CachePath string } type Ledger struct { @@ -104,8 +108,15 @@ type Archive struct { checkpointManager CheckpointManager backend storage.Storage - cache *ArchiveBucketCache stats archiveStats + cache *archiveBucketCache +} + +type archiveBucketCache struct { + fscache.Cache + + path string + sizes sync.Map } func (arch *Archive) GetStats() []ArchiveStats { @@ -383,23 +394,79 @@ func (a *Archive) GetXdrStream(pth string) (*XdrStream, error) { } func (a *Archive) cachedGet(pth string) (io.ReadCloser, error) { - if a.cache != nil { - rdr, foundInCache, err := a.cache.GetFile(pth, a.backend) - if !foundInCache { - a.stats.incrementDownloads() - } else { - a.stats.incrementCacheHits() - } - if err == nil { - return rdr, nil + if a.cache == nil { + a.stats.incrementDownloads() + return a.backend.GetFile(pth) + } + + L := log.WithField("path", pth).WithField("cache", a.cache.path) + + rdr, wrtr, err := a.cache.Get(pth) + if err != nil { + L.WithError(err). + WithField("remove", a.cache.Remove(pth)). + Warn("On-disk cache retrieval failed") + a.stats.incrementDownloads() + return a.backend.GetFile(pth) + } + + // If a NEW key is being retrieved, it returns a writer to which + // you're expected to write your upstream as well as a reader that + // will read directly from it. + if wrtr != nil { + log.WithField("path", pth).Info("Caching file...") + a.stats.incrementDownloads() + upstreamReader, err := a.backend.GetFile(pth) + if err != nil { + writeErr := wrtr.Close() + readErr := rdr.Close() + removeErr := a.cache.Remove(pth) + // Execution order isn't guaranteed w/in a function call expression + // so we close them with explicit order first. + L.WithError(err).WithFields(log.Fields{ + "write-close": writeErr, + "read-close": readErr, + "cache-rm": removeErr, + }).Warn("Download failed, purging from cache") + return nil, err } - // If there's an error, retry with the uncached backend. - a.cache.Evict(pth) + // Start a goroutine to slurp up the upstream and feed + // it directly to the cache. + go func() { + written, err := io.Copy(wrtr, upstreamReader) + writeErr := wrtr.Close() + readErr := upstreamReader.Close() + fields := log.Fields{ + "wr-close": writeErr, + "rd-close": readErr, + } + + if err != nil { + L.WithFields(fields).WithError(err). + Warn("Failed to download and cache file") + + // Removal must happen *after* handles close. + if removalErr := a.cache.Remove(pth); removalErr != nil { + L.WithError(removalErr).Warn("Removing cached file failed") + } + } else { + L.WithFields(fields).Infof("Cached %dKiB file", written/1024) + + // Track how much bandwidth we've saved from caching by saving + // the size of the file we just downloaded. + a.cache.sizes.Store(pth, written) + } + }() + } else { + // Best-effort check to track bandwidth metrics + if written, found := a.cache.sizes.Load(pth); found { + a.stats.incrementCacheBandwidth(written.(int64)) + } + a.stats.incrementCacheHits() } - a.stats.incrementDownloads() - return a.backend.GetFile(pth) + return rdr, nil } func (a *Archive) cachedExists(pth string) (bool, error) { @@ -439,13 +506,30 @@ func Connect(u string, opts ArchiveOptions) (*Archive, error) { return &arch, err } - if opts.CacheConfig.Cache { - cache, innerErr := MakeArchiveBucketCache(opts.CacheConfig) - if innerErr != nil { - return &arch, innerErr + if opts.CachePath != "" { + // Set up a <= ~10GiB LRU cache for history archives files + haunter := fscache.NewLRUHaunterStrategy( + fscache.NewLRUHaunter(0, 10<<30, time.Minute /* frequency check */), + ) + + // Wipe any existing cache on startup + os.RemoveAll(opts.CachePath) + fs, err := fscache.NewFs(opts.CachePath, 0755 /* drwxr-xr-x */) + + if err != nil { + return &arch, errors.Wrapf(err, + "creating cache at '%s' with mode 0755 failed", + opts.CachePath) + } + + cache, err := fscache.NewCacheWithHaunter(fs, haunter) + if err != nil { + return &arch, errors.Wrapf(err, + "creating cache at '%s' failed", + opts.CachePath) } - arch.cache = cache + arch.cache = &archiveBucketCache{cache, opts.CachePath, sync.Map{}} } arch.stats = archiveStats{backendName: u} @@ -467,6 +551,8 @@ func ConnectBackend(u string, opts storage.ConnectOptions) (storage.Storage, err if parsed.Scheme == "mock" { backend = makeMockBackend() + } else if parsed.Scheme == "fmock" { + backend = makeFailingMockBackend() } else { backend, err = storage.ConnectBackend(u, opts) } diff --git a/historyarchive/archive_test.go b/historyarchive/archive_test.go index de34c36f68..e5be8febbb 100644 --- a/historyarchive/archive_test.go +++ b/historyarchive/archive_test.go @@ -18,13 +18,18 @@ import ( "os" "path/filepath" "strings" + "sync" "testing" + "time" "github.com/stellar/go/support/storage" "github.com/stellar/go/xdr" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) +var cachePath = filepath.Join(os.TempDir(), "history-archive-test-cache") + func GetTestS3Archive() *Archive { mx := big.NewInt(0xffffffff) r, e := rand.Int(rand.Reader, mx) @@ -49,13 +54,10 @@ func GetTestS3Archive() *Archive { } func GetTestMockArchive() *Archive { - return MustConnect("mock://test", - ArchiveOptions{CheckpointFrequency: 64, - CacheConfig: CacheOptions{ - Cache: true, - Path: filepath.Join(os.TempDir(), "history-archive-test-cache"), - MaxFiles: 5, - }}) + return MustConnect("mock://test", ArchiveOptions{ + CheckpointFrequency: 64, + CachePath: cachePath, + }) } var tmpdirs []string @@ -563,7 +565,95 @@ func TestGetLedgers(t *testing.T) { assert.Equal(t, uint32(1), archive.GetStats()[0].GetRequests()) assert.Equal(t, uint32(0), archive.GetStats()[0].GetDownloads()) assert.EqualError(t, err, "checkpoint 1023 is not published") + ledgerHeaders, transactions, results := makeFakeArchive(t, archive) + + stats := archive.GetStats()[0] + ledgers, err := archive.GetLedgers(1000, 1002) + + assert.NoError(t, err) + assert.Len(t, ledgers, 3) + // it started at 1, incurred 6 requests total: 3 queries + 3 downloads + assert.EqualValues(t, 7, stats.GetRequests()) + // started 0, incurred 3 file downloads + assert.EqualValues(t, 3, stats.GetDownloads()) + assert.EqualValues(t, 0, stats.GetCacheHits()) + for i, seq := range []uint32{1000, 1001, 1002} { + ledger := ledgers[seq] + assertXdrEquals(t, ledgerHeaders[i], ledger.Header) + assertXdrEquals(t, transactions[i], ledger.Transaction) + assertXdrEquals(t, results[i], ledger.TransactionResult) + } + + // Repeat the same check but ensure the cache was used + ledgers, err = archive.GetLedgers(1000, 1002) // all cached + assert.NoError(t, err) + assert.Len(t, ledgers, 3) + + // downloads should not change because of the cache + assert.EqualValues(t, 3, stats.GetDownloads()) + // but requests increase because of 3 fetches to categories + assert.EqualValues(t, 10, stats.GetRequests()) + assert.EqualValues(t, 3, stats.GetCacheHits()) + for i, seq := range []uint32{1000, 1001, 1002} { + ledger := ledgers[seq] + assertXdrEquals(t, ledgerHeaders[i], ledger.Header) + assertXdrEquals(t, transactions[i], ledger.Transaction) + assertXdrEquals(t, results[i], ledger.TransactionResult) + } + + // remove the cached files without informing it and ensure it fills up again + require.NoError(t, os.RemoveAll(cachePath)) + ledgers, err = archive.GetLedgers(1000, 1002) // uncached, refetch + assert.NoError(t, err) + assert.Len(t, ledgers, 3) + + // downloads should increase again + assert.EqualValues(t, 6, stats.GetDownloads()) + assert.EqualValues(t, 3, stats.GetCacheHits()) +} + +func TestStressfulGetLedgers(t *testing.T) { + archive := GetTestMockArchive() + ledgerHeaders, transactions, results := makeFakeArchive(t, archive) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + + go func() { + time.Sleep(time.Millisecond) // encourage interleaved execution + ledgers, err := archive.GetLedgers(1000, 1002) + assert.NoError(t, err) + assert.Len(t, ledgers, 3) + for i, seq := range []uint32{1000, 1001, 1002} { + ledger := ledgers[seq] + assertXdrEquals(t, ledgerHeaders[i], ledger.Header) + assertXdrEquals(t, transactions[i], ledger.Transaction) + assertXdrEquals(t, results[i], ledger.TransactionResult) + } + + wg.Done() + }() + } + + require.Eventually(t, func() bool { wg.Wait(); return true }, time.Minute, time.Second) +} +func TestCacheDeadlocks(t *testing.T) { + archive := MustConnect("fmock://test", ArchiveOptions{ + CheckpointFrequency: 64, + CachePath: cachePath, + }) + makeFakeArchive(t, archive) + _, err := archive.GetLedgers(1000, 1002) + require.Error(t, err) +} + +func makeFakeArchive(t *testing.T, archive *Archive) ( + []xdr.LedgerHeaderHistoryEntry, + []xdr.TransactionHistoryEntry, + []xdr.TransactionHistoryResultEntry, +) { ledgerHeaders := []xdr.LedgerHeaderHistoryEntry{ { Hash: xdr.Hash{1}, @@ -646,36 +736,5 @@ func TestGetLedgers(t *testing.T) { []xdrEntry{results[0], results[1], results[2]}, ) - stats := archive.GetStats()[0] - ledgers, err := archive.GetLedgers(1000, 1002) - - assert.NoError(t, err) - assert.Len(t, ledgers, 3) - // it started at 1, incurred 6 requests total, 3 queries, 3 downloads - assert.EqualValues(t, 7, stats.GetRequests()) - // started 0, incurred 3 file downloads - assert.EqualValues(t, 3, stats.GetDownloads()) - for i, seq := range []uint32{1000, 1001, 1002} { - ledger := ledgers[seq] - assertXdrEquals(t, ledgerHeaders[i], ledger.Header) - assertXdrEquals(t, transactions[i], ledger.Transaction) - assertXdrEquals(t, results[i], ledger.TransactionResult) - } - - // Repeat the same check but ensure the cache was used - ledgers, err = archive.GetLedgers(1000, 1002) // all cached - assert.NoError(t, err) - assert.Len(t, ledgers, 3) - - // downloads should not change because of the cache - assert.EqualValues(t, 3, stats.GetDownloads()) - // but requests increase because of 3 fetches to categories - assert.EqualValues(t, 10, stats.GetRequests()) - assert.EqualValues(t, 3, stats.GetCacheHits()) - for i, seq := range []uint32{1000, 1001, 1002} { - ledger := ledgers[seq] - assertXdrEquals(t, ledgerHeaders[i], ledger.Header) - assertXdrEquals(t, transactions[i], ledger.Transaction) - assertXdrEquals(t, results[i], ledger.TransactionResult) - } + return ledgerHeaders, transactions, results } diff --git a/historyarchive/failing_mock_archive.go b/historyarchive/failing_mock_archive.go new file mode 100644 index 0000000000..815b575648 --- /dev/null +++ b/historyarchive/failing_mock_archive.go @@ -0,0 +1,82 @@ +package historyarchive + +import ( + "io" + + "github.com/stellar/go/support/errors" + "github.com/stellar/go/support/storage" +) + +// FailingMockArchiveBackend is a mocking backend that will fail only when you +// try to read but otherwise behave like MockArchiveBackend. +type FailingMockArchiveBackend struct { + files map[string][]byte +} + +func (b *FailingMockArchiveBackend) Exists(pth string) (bool, error) { + _, ok := b.files[pth] + return ok, nil +} + +func (b *FailingMockArchiveBackend) Size(pth string) (int64, error) { + f, ok := b.files[pth] + sz := int64(0) + if ok { + sz = int64(len(f)) + } + return sz, nil +} + +func (b *FailingMockArchiveBackend) GetFile(pth string) (io.ReadCloser, error) { + data, ok := b.files[pth] + if !ok { + return nil, errors.New("file does not exist") + } + + fr := FakeReader{} + fr.data = make([]byte, len(data)) + copy(fr.data[:], data[:]) + return &fr, nil +} + +func (b *FailingMockArchiveBackend) PutFile(pth string, in io.ReadCloser) error { + buf, e := io.ReadAll(in) + if e != nil { + return e + } + b.files[pth] = buf + return nil +} + +func (b *FailingMockArchiveBackend) ListFiles(pth string) (chan string, chan error) { + return nil, nil +} + +func (b *FailingMockArchiveBackend) CanListFiles() bool { + return false +} + +func (b *FailingMockArchiveBackend) Close() error { + b.files = make(map[string][]byte) + return nil +} + +func makeFailingMockBackend() storage.Storage { + b := new(FailingMockArchiveBackend) + b.Close() + return b +} + +type FakeReader struct { + data []byte +} + +func (fr *FakeReader) Read(b []byte) (int, error) { + return 0, io.ErrClosedPipe +} + +func (fr *FakeReader) Close() error { + return nil +} + +var _ io.ReadCloser = &FakeReader{} diff --git a/historyarchive/mocks.go b/historyarchive/mocks.go index fe497ec36e..fa5716e5de 100644 --- a/historyarchive/mocks.go +++ b/historyarchive/mocks.go @@ -137,3 +137,8 @@ func (m *MockArchiveStats) GetCacheHits() uint32 { a := m.Called() return a.Get(0).(uint32) } + +func (m *MockArchiveStats) GetCacheBandwidth() uint64 { + a := m.Called() + return a.Get(0).(uint64) +} diff --git a/historyarchive/stats.go b/historyarchive/stats.go index c182853d1b..6dbf8ceed2 100644 --- a/historyarchive/stats.go +++ b/historyarchive/stats.go @@ -8,6 +8,7 @@ type archiveStats struct { fileDownloads atomic.Uint32 fileUploads atomic.Uint32 cacheHits atomic.Uint32 + cacheBw atomic.Uint64 backendName string } @@ -16,6 +17,7 @@ type ArchiveStats interface { GetDownloads() uint32 GetUploads() uint32 GetCacheHits() uint32 + GetCacheBandwidth() uint64 GetBackendName() string } @@ -37,6 +39,10 @@ func (as *archiveStats) incrementCacheHits() { as.cacheHits.Add(1) } +func (as *archiveStats) incrementCacheBandwidth(bytes int64) { + as.cacheBw.Add(uint64(bytes)) +} + func (as *archiveStats) GetRequests() uint32 { return as.requests.Load() } @@ -55,3 +61,6 @@ func (as *archiveStats) GetBackendName() string { func (as *archiveStats) GetCacheHits() uint32 { return as.cacheHits.Load() } +func (as *archiveStats) GetCacheBandwidth() uint64 { + return as.cacheBw.Load() +} diff --git a/historyarchive/xdrstream.go b/historyarchive/xdrstream.go index de8efc3bb6..313c600f8b 100644 --- a/historyarchive/xdrstream.go +++ b/historyarchive/xdrstream.go @@ -107,7 +107,7 @@ func (x *XdrStream) ExpectedHash() ([sha256.Size]byte, bool) { func (x *XdrStream) Close() error { if x.validateHash { // Read all remaining data from rdr - _, err := io.Copy(ioutil.Discard, x.rdr) + _, err := io.Copy(io.Discard, x.rdr) if err != nil { // close the internal readers to avoid memory leaks x.closeReaders() diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index fc2b046a57..0e1b116923 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -3,7 +3,11 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). -## Unreleased +## 2.28.2 + +### Fixed +- History archive caching would cause file corruption in certain environments [5197](https://github.com/stellar/go/pull/5197) +- Server error in claimable balance API when claimant, asset and cursor query params are supplied [5200](https://github.com/stellar/go/pull/5200) ## 2.28.1 diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 07bbf975fa..7d14ca314e 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -407,6 +407,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, ingestConfig := ingest.Config{ NetworkPassphrase: config.NetworkPassphrase, HistoryArchiveURLs: config.HistoryArchiveURLs, + HistoryArchiveCaching: config.HistoryArchiveCaching, CheckpointFrequency: config.CheckpointFrequency, ReingestEnabled: true, MaxReingestRetries: int(retries), diff --git a/services/horizon/cmd/ingest.go b/services/horizon/cmd/ingest.go index 3833dba7fd..18452dc74a 100644 --- a/services/horizon/cmd/ingest.go +++ b/services/horizon/cmd/ingest.go @@ -128,6 +128,7 @@ var ingestVerifyRangeCmd = &cobra.Command{ NetworkPassphrase: globalConfig.NetworkPassphrase, HistorySession: horizonSession, HistoryArchiveURLs: globalConfig.HistoryArchiveURLs, + HistoryArchiveCaching: globalConfig.HistoryArchiveCaching, CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath, CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB, CheckpointFrequency: globalConfig.CheckpointFrequency, @@ -210,6 +211,7 @@ var ingestStressTestCmd = &cobra.Command{ NetworkPassphrase: globalConfig.NetworkPassphrase, HistorySession: horizonSession, HistoryArchiveURLs: globalConfig.HistoryArchiveURLs, + HistoryArchiveCaching: globalConfig.HistoryArchiveCaching, RoundingSlippageFilter: globalConfig.RoundingSlippageFilter, CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath, CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB, @@ -349,6 +351,7 @@ var ingestBuildStateCmd = &cobra.Command{ NetworkPassphrase: globalConfig.NetworkPassphrase, HistorySession: horizonSession, HistoryArchiveURLs: globalConfig.HistoryArchiveURLs, + HistoryArchiveCaching: globalConfig.HistoryArchiveCaching, CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath, CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB, CheckpointFrequency: globalConfig.CheckpointFrequency, diff --git a/services/horizon/internal/config.go b/services/horizon/internal/config.go index 8fb31075b8..54f843b810 100644 --- a/services/horizon/internal/config.go +++ b/services/horizon/internal/config.go @@ -27,6 +27,7 @@ type Config struct { CaptiveCoreStoragePath string CaptiveCoreReuseStoragePath bool CaptiveCoreConfigUseDB bool + HistoryArchiveCaching bool StellarCoreURL string diff --git a/services/horizon/internal/db2/history/claimable_balances.go b/services/horizon/internal/db2/history/claimable_balances.go index 5490bef11c..d45780a4c0 100644 --- a/services/horizon/internal/db2/history/claimable_balances.go +++ b/services/horizon/internal/db2/history/claimable_balances.go @@ -67,17 +67,17 @@ func applyClaimableBalancesQueriesCursor(sql sq.SelectBuilder, lCursor int64, rC case db2.OrderAscending: if hasPagedLimit { sql = sql. - Where(sq.Expr("(last_modified_ledger, id) > (?, ?)", lCursor, rCursor)) + Where(sq.Expr("(cb.last_modified_ledger, cb.id) > (?, ?)", lCursor, rCursor)) } - sql = sql.OrderBy("last_modified_ledger asc, id asc") + sql = sql.OrderBy("cb.last_modified_ledger asc, cb.id asc") case db2.OrderDescending: if hasPagedLimit { sql = sql. - Where(sq.Expr("(last_modified_ledger, id) < (?, ?)", lCursor, rCursor)) + Where(sq.Expr("(cb.last_modified_ledger, cb.id) < (?, ?)", lCursor, rCursor)) } - sql = sql.OrderBy("last_modified_ledger desc, id desc") + sql = sql.OrderBy("cb.last_modified_ledger desc, cb.id desc") default: return sql, errors.Errorf("invalid order: %s", order) } diff --git a/services/horizon/internal/db2/history/claimable_balances_test.go b/services/horizon/internal/db2/history/claimable_balances_test.go index ca32975c62..769ab3bc13 100644 --- a/services/horizon/internal/db2/history/claimable_balances_test.go +++ b/services/horizon/internal/db2/history/claimable_balances_test.go @@ -219,6 +219,188 @@ func TestFindClaimableBalancesByDestination(t *testing.T) { tt.Assert.Len(cbs, 1) } +func TestFindClaimableBalancesByCursor(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + q := &Q{tt.HorizonSession()} + + tt.Assert.NoError(q.BeginTx(tt.Ctx, &sql.TxOptions{})) + defer func() { + _ = q.Rollback() + }() + + balanceInsertBuilder := q.NewClaimableBalanceBatchInsertBuilder() + claimantsInsertBuilder := q.NewClaimableBalanceClaimantBatchInsertBuilder() + + dest1 := "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML" + dest2 := "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H" + + sponsor1 := "GA25GQLHJU3LPEJXEIAXK23AWEA5GWDUGRSHTQHDFT6HXHVMRULSQJUJ" + sponsor2 := "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H" + + asset := xdr.MustNewCreditAsset("USD", dest1) + balanceID := xdr.ClaimableBalanceId{ + Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0, + V0: &xdr.Hash{1, 2, 3}, + } + id, err := xdr.MarshalHex(balanceID) + tt.Assert.NoError(err) + cBalance := ClaimableBalance{ + BalanceID: id, + Claimants: []Claimant{ + { + Destination: dest1, + Predicate: xdr.ClaimPredicate{ + Type: xdr.ClaimPredicateTypeClaimPredicateUnconditional, + }, + }, + }, + Asset: asset, + LastModifiedLedger: 123, + Amount: 10, + Sponsor: null.StringFrom(sponsor1), + } + + tt.Assert.NoError(balanceInsertBuilder.Add(cBalance)) + tt.Assert.NoError(insertClaimants(claimantsInsertBuilder, cBalance)) + + balanceID = xdr.ClaimableBalanceId{ + Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0, + V0: &xdr.Hash{3, 2, 1}, + } + id, err = xdr.MarshalHex(balanceID) + tt.Assert.NoError(err) + cBalance = ClaimableBalance{ + BalanceID: id, + Claimants: []Claimant{ + { + Destination: dest1, + Predicate: xdr.ClaimPredicate{ + Type: xdr.ClaimPredicateTypeClaimPredicateUnconditional, + }, + }, + { + Destination: dest2, + Predicate: xdr.ClaimPredicate{ + Type: xdr.ClaimPredicateTypeClaimPredicateUnconditional, + }, + }, + }, + Asset: asset, + LastModifiedLedger: 300, + Amount: 10, + Sponsor: null.StringFrom(sponsor2), + } + + tt.Assert.NoError(balanceInsertBuilder.Add(cBalance)) + tt.Assert.NoError(insertClaimants(claimantsInsertBuilder, cBalance)) + + tt.Assert.NoError(claimantsInsertBuilder.Exec(tt.Ctx)) + tt.Assert.NoError(balanceInsertBuilder.Exec(tt.Ctx)) + + query := ClaimableBalancesQuery{ + PageQuery: db2.MustPageQuery("", false, "", 10), + } + + cbs, err := q.GetClaimableBalances(tt.Ctx, query) + tt.Assert.NoError(err) + tt.Assert.Len(cbs, 2) + + order := "" // default is "asc" + // this validates the cb query with claimant and cb.id/ledger cursor parameters + query.PageQuery = db2.MustPageQuery(fmt.Sprintf("%v-%s", 150, cbs[0].BalanceID), false, order, 10) + query.Claimant = xdr.MustAddressPtr(dest1) + cbs, err = q.GetClaimableBalances(tt.Ctx, query) + tt.Assert.NoError(err) + tt.Assert.Len(cbs, 1) + tt.Assert.Equal(dest2, cbs[0].Claimants[1].Destination) + + // this validates the cb query with claimant, asset, sponsor and cb.id/ledger cursor parameters + query.PageQuery = db2.MustPageQuery(fmt.Sprintf("%v-%s", 150, cbs[0].BalanceID), false, order, 10) + query.Claimant = xdr.MustAddressPtr(dest1) + query.Asset = &asset + query.Sponsor = xdr.MustAddressPtr(sponsor2) + + cbs, err = q.GetClaimableBalances(tt.Ctx, query) + tt.Assert.NoError(err) + tt.Assert.Len(cbs, 1) + tt.Assert.Equal(dest2, cbs[0].Claimants[1].Destination) + + // this validates the cb query with no claimant, asset, sponsor and cb.id/ledger cursor parameters + query.PageQuery = db2.MustPageQuery(fmt.Sprintf("%v-%s", 150, cbs[0].BalanceID), false, order, 10) + query.Claimant = nil + query.Asset = &asset + query.Sponsor = xdr.MustAddressPtr(sponsor2) + + cbs, err = q.GetClaimableBalances(tt.Ctx, query) + tt.Assert.NoError(err) + tt.Assert.Len(cbs, 1) + tt.Assert.Equal(dest2, cbs[0].Claimants[1].Destination) + + order = "desc" + // claimant and cb.id/ledger cursor parameters + query.PageQuery = db2.MustPageQuery(fmt.Sprintf("%v-%s", 301, cbs[0].BalanceID), false, order, 10) + query.Claimant = xdr.MustAddressPtr(dest1) + cbs, err = q.GetClaimableBalances(tt.Ctx, query) + tt.Assert.NoError(err) + tt.Assert.Len(cbs, 1) + tt.Assert.Equal(dest2, cbs[0].Claimants[1].Destination) + + // claimant, asset, sponsor and cb.id/ledger cursor parameters + query.PageQuery = db2.MustPageQuery(fmt.Sprintf("%v-%s", 301, cbs[0].BalanceID), false, order, 10) + query.Claimant = xdr.MustAddressPtr(dest1) + query.Asset = &asset + query.Sponsor = xdr.MustAddressPtr(sponsor2) + + cbs, err = q.GetClaimableBalances(tt.Ctx, query) + tt.Assert.NoError(err) + tt.Assert.Len(cbs, 1) + tt.Assert.Equal(dest2, cbs[0].Claimants[1].Destination) + + // no claimant, asset, sponsor and cb.id/ledger cursor parameters + query.PageQuery = db2.MustPageQuery(fmt.Sprintf("%v-%s", 301, cbs[0].BalanceID), false, order, 10) + query.Claimant = nil + query.Asset = &asset + query.Sponsor = xdr.MustAddressPtr(sponsor2) + + cbs, err = q.GetClaimableBalances(tt.Ctx, query) + tt.Assert.NoError(err) + tt.Assert.Len(cbs, 1) + tt.Assert.Equal(dest2, cbs[0].Claimants[1].Destination) + + order = "asc" + // claimant and cb.id/ledger cursor parameters + query.PageQuery = db2.MustPageQuery(fmt.Sprintf("%v-%s", 150, cbs[0].BalanceID), false, order, 10) + query.Claimant = xdr.MustAddressPtr(dest1) + cbs, err = q.GetClaimableBalances(tt.Ctx, query) + tt.Assert.NoError(err) + tt.Assert.Len(cbs, 1) + tt.Assert.Equal(dest2, cbs[0].Claimants[1].Destination) + + // claimant, asset, sponsor and cb.id/ledger cursor parameters + query.PageQuery = db2.MustPageQuery(fmt.Sprintf("%v-%s", 150, cbs[0].BalanceID), false, order, 10) + query.Claimant = xdr.MustAddressPtr(dest1) + query.Asset = &asset + query.Sponsor = xdr.MustAddressPtr(sponsor2) + + cbs, err = q.GetClaimableBalances(tt.Ctx, query) + tt.Assert.NoError(err) + tt.Assert.Len(cbs, 1) + tt.Assert.Equal(dest2, cbs[0].Claimants[1].Destination) + + // no claimant, asset, sponsor and cb.id/ledger cursor parameters + query.PageQuery = db2.MustPageQuery(fmt.Sprintf("%v-%s", 150, cbs[0].BalanceID), false, order, 10) + query.Claimant = nil + query.Asset = &asset + query.Sponsor = xdr.MustAddressPtr(sponsor2) + + cbs, err = q.GetClaimableBalances(tt.Ctx, query) + tt.Assert.NoError(err) + tt.Assert.Len(cbs, 1) + tt.Assert.Equal(dest2, cbs[0].Claimants[1].Destination) +} + func insertClaimants(claimantsInsertBuilder ClaimableBalanceClaimantBatchInsertBuilder, cBalance ClaimableBalance) error { for _, claimant := range cBalance.Claimants { claimant := ClaimableBalanceClaimant{ diff --git a/services/horizon/internal/flags.go b/services/horizon/internal/flags.go index eb229c65b2..87deb28c48 100644 --- a/services/horizon/internal/flags.go +++ b/services/horizon/internal/flags.go @@ -51,6 +51,9 @@ const ( NetworkPassphraseFlagName = "network-passphrase" // HistoryArchiveURLsFlagName is the command line flag for specifying the history archive URLs HistoryArchiveURLsFlagName = "history-archive-urls" + // HistoryArchiveCaching is the flag for controlling whether or not there's + // an on-disk cache for history archive downloads + HistoryArchiveCachingFlagName = "history-archive-caching" // NetworkFlagName is the command line flag for specifying the "network" NetworkFlagName = "network" // EnableIngestionFilteringFlagName is the command line flag for enabling the experimental ingestion filtering feature (now enabled by default) @@ -236,11 +239,7 @@ func Flags() (*Config, support.ConfigOptions) { OptType: types.Bool, FlagDefault: true, Required: false, - Usage: `when enabled, Horizon ingestion will instruct the captive - core invocation to use an external db url for ledger states rather than in memory(RAM).\n - Will result in several GB of space shifting out of RAM and to the external db persistence.\n - The external db url is determined by the presence of DATABASE parameter in the captive-core-config-path or\n - or if absent, the db will default to sqlite and the db file will be stored at location derived from captive-core-storage-path parameter.`, + Usage: `when enabled, Horizon ingestion will instruct the captive core invocation to use an external db url for ledger states rather than in memory(RAM). Will result in several GB of space shifting out of RAM and to the external db persistence. The external db url is determined by the presence of DATABASE parameter in the captive-core-config-path or if absent, the db will default to sqlite and the db file will be stored at location derived from captive-core-storage-path parameter.`, CustomSetValue: func(opt *support.ConfigOption) error { if val := viper.GetBool(opt.Name); val { config.CaptiveCoreConfigUseDB = val @@ -372,6 +371,14 @@ func Flags() (*Config, support.ConfigOptions) { Usage: "comma-separated list of stellar history archives to connect with", UsedInCommands: IngestionCommands, }, + &support.ConfigOption{ + Name: HistoryArchiveCachingFlagName, + ConfigKey: &config.HistoryArchiveCaching, + OptType: types.Bool, + FlagDefault: true, + Usage: "adds caching for history archive downloads (requires an add'l 10GB of disk space on mainnet)", + UsedInCommands: IngestionCommands, + }, &support.ConfigOption{ Name: "port", ConfigKey: &config.Port, diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index e0c667b033..892868e5b9 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -595,6 +595,11 @@ func addHistoryArchiveStatsMetrics(s *system, stats []historyarchive.ArchiveStat "source": historyServerStat.GetBackendName(), "type": "cache_hits"}). Add(float64(historyServerStat.GetCacheHits())) + s.Metrics().HistoryArchiveStatsCounter. + With(prometheus.Labels{ + "source": historyServerStat.GetBackendName(), + "type": "cache_bandwidth"}). + Add(float64(historyServerStat.GetCacheBandwidth())) } } diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 2726e02484..98acd68f33 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -88,8 +88,9 @@ type Config struct { CaptiveCoreConfigUseDB bool NetworkPassphrase string - HistorySession db.SessionInterface - HistoryArchiveURLs []string + HistorySession db.SessionInterface + HistoryArchiveURLs []string + HistoryArchiveCaching bool DisableStateVerification bool EnableReapLookupTables bool @@ -223,6 +224,11 @@ type system struct { func NewSystem(config Config) (System, error) { ctx, cancel := context.WithCancel(context.Background()) + cachingPath := "" + if config.HistoryArchiveCaching { + cachingPath = path.Join(config.CaptiveCoreStoragePath, "bucket-cache") + } + archive, err := historyarchive.NewArchivePool( config.HistoryArchiveURLs, historyarchive.ArchiveOptions{ @@ -230,13 +236,9 @@ func NewSystem(config Config) (System, error) { CheckpointFrequency: config.CheckpointFrequency, ConnectOptions: storage.ConnectOptions{ Context: ctx, - UserAgent: fmt.Sprintf("horizon/%s golang/%s", apkg.Version(), runtime.Version())}, - CacheConfig: historyarchive.CacheOptions{ - Cache: true, - Path: path.Join(config.CaptiveCoreStoragePath, "bucket-cache"), - Log: log.WithField("subservice", "ha-cache"), - MaxFiles: 150, + UserAgent: fmt.Sprintf("horizon/%s golang/%s", apkg.Version(), runtime.Version()), }, + CachePath: cachingPath, }, ) if err != nil { diff --git a/services/horizon/internal/ingest/resume_state_test.go b/services/horizon/internal/ingest/resume_state_test.go index f1f8b2ce2a..985391883f 100644 --- a/services/horizon/internal/ingest/resume_state_test.go +++ b/services/horizon/internal/ingest/resume_state_test.go @@ -267,6 +267,7 @@ func (s *ResumeTestTestSuite) mockSuccessfulIngestion() { mockStats.On("GetRequests").Return(uint32(0)) mockStats.On("GetUploads").Return(uint32(0)) mockStats.On("GetCacheHits").Return(uint32(0)) + mockStats.On("GetCacheBandwidth").Return(uint64(0)) s.historyAdapter.On("GetStats").Return([]historyarchive.ArchiveStats{mockStats}).Once() s.runner.On("RunAllProcessorsOnLedger", mock.AnythingOfType("xdr.LedgerCloseMeta")). @@ -384,6 +385,7 @@ func (s *ResumeTestTestSuite) TestReapingObjectsDisabled() { mockStats.On("GetRequests").Return(uint32(0)) mockStats.On("GetUploads").Return(uint32(0)) mockStats.On("GetCacheHits").Return(uint32(0)) + mockStats.On("GetCacheBandwidth").Return(uint64(0)) s.historyAdapter.On("GetStats").Return([]historyarchive.ArchiveStats{mockStats}).Once() // Reap lookup tables not executed @@ -434,6 +436,7 @@ func (s *ResumeTestTestSuite) TestErrorReapingObjectsIgnored() { mockStats.On("GetRequests").Return(uint32(0)) mockStats.On("GetUploads").Return(uint32(0)) mockStats.On("GetCacheHits").Return(uint32(0)) + mockStats.On("GetCacheBandwidth").Return(uint64(0)) s.historyAdapter.On("GetStats").Return([]historyarchive.ArchiveStats{mockStats}).Once() next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system) diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index d4b34f9f4d..60ba7b6c2a 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -97,6 +97,7 @@ func initIngester(app *App) { ), NetworkPassphrase: app.config.NetworkPassphrase, HistoryArchiveURLs: app.config.HistoryArchiveURLs, + HistoryArchiveCaching: app.config.HistoryArchiveCaching, CheckpointFrequency: app.config.CheckpointFrequency, StellarCoreURL: app.config.StellarCoreURL, CaptiveCoreBinaryPath: app.config.CaptiveCoreBinaryPath,