Skip to content

Commit

Permalink
historyarchive: Add round-robin, error-resilience, and back-off to th…
Browse files Browse the repository at this point in the history
…e `ArchivePool` (#5224)

* Perform round robin w/ individual backoff
* Remove transient retry test from checkpoints
* go.mod fixups with a tidy run
  • Loading branch information
Shaptic authored Mar 7, 2024
1 parent 742b367 commit ab3a926
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 125 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ require (
gopkg.in/tylerb/graceful.v1 v1.2.15
)

require golang.org/x/sync v0.6.0
require (
github.com/cenkalti/backoff/v4 v4.2.1
golang.org/x/sync v0.6.0
)

require (
cloud.google.com/go/compute v1.23.3 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ github.com/bitly/go-hostpool v0.1.0/go.mod h1:4gOCgp6+NZnVqlKyZ/iBZFTAJKembaVENU
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/buger/goreplay v1.3.2 h1:MFAStZZCsHMPeN5xJ11rhUtV4ZctFRgzSHTfWSWOJsg=
github.com/buger/goreplay v1.3.2/go.mod h1:EyAKHxJR6K6phd0NaoPETSDbJRB/ogIw3Y15UlSbVBM=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d h1:S2NE3iHSwP0XV47EEXL8mWmRdEfGscSJ+7EgePNgt0s=
github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
Expand Down
225 changes: 170 additions & 55 deletions historyarchive/archive_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,72 @@
package historyarchive

import (
"context"
"math/rand"
"time"

"github.com/stellar/go/support/errors"
"github.com/pkg/errors"
log "github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"

backoff "github.com/cenkalti/backoff/v4"
)

// A ArchivePool is just a collection of `ArchiveInterface`s so that we can
// An ArchivePool is just a collection of `ArchiveInterface`s so that we can
// distribute requests fairly throughout the pool.
type ArchivePool []ArchiveInterface
type ArchivePool struct {
backoff backoff.BackOff
pool []ArchiveInterface
curr int
}

// NewArchivePool tries connecting to each of the provided history archive URLs,
// returning a pool of valid archives.
//
// If none of the archives work, this returns the error message of the last
// failed archive. Note that the errors for each individual archive are hard to
// track if there's success overall.
func NewArchivePool(archiveURLs []string, opts ArchiveOptions) (ArchivePool, error) {
func NewArchivePool(archiveURLs []string, opts ArchiveOptions) (ArchiveInterface, error) {
return NewArchivePoolWithBackoff(
archiveURLs,
opts,
backoff.WithMaxRetries(backoff.NewConstantBackOff(250*time.Millisecond), 3),
)
}

func NewArchivePoolWithBackoff(archiveURLs []string, opts ArchiveOptions, strategy backoff.BackOff) (ArchiveInterface, error) {
if len(archiveURLs) <= 0 {
return nil, errors.New("No history archives provided")
}

var lastErr error = nil
ap := ArchivePool{
pool: make([]ArchiveInterface, 0, len(archiveURLs)),
backoff: strategy,
}
var lastErr error

// Try connecting to all of the listed archives, but only store valid ones.
var validArchives ArchivePool
for _, url := range archiveURLs {
archive, err := Connect(
url,
opts,
)

archive, err := Connect(url, opts)
if err != nil {
lastErr = errors.Wrapf(err, "Error connecting to history archive (%s)", url)
continue
}

validArchives = append(validArchives, archive)
ap.pool = append(ap.pool, archive)
}

if len(validArchives) == 0 {
if len(ap.pool) == 0 {
return nil, lastErr
}

return validArchives, nil
ap.curr = rand.Intn(len(ap.pool)) // don't necessarily start at zero
return &ap, nil
}

func (pa ArchivePool) GetStats() []ArchiveStats {
func (pa *ArchivePool) GetStats() []ArchiveStats {
stats := []ArchiveStats{}
for _, archive := range pa {
for _, archive := range pa.pool {
stats = append(stats, archive.GetStats()...)
}
return stats
Expand All @@ -62,80 +79,178 @@ func (pa ArchivePool) GetStats() []ArchiveStats {
// Ensure the pool conforms to the ArchiveInterface
var _ ArchiveInterface = &ArchivePool{}

// Below are the ArchiveInterface method implementations.
//
// These are helpers to round-robin calls through archives.
//

func (pa ArchivePool) GetAnyArchive() ArchiveInterface {
return pa[rand.Intn(len(pa))]
// getNextArchive statefully round-robins through the pool
func (pa *ArchivePool) getNextArchive() ArchiveInterface {
// Round-robin through the archives
pa.curr = (pa.curr + 1) % len(pa.pool)
return pa.pool[pa.curr]
}

func (pa ArchivePool) GetPathHAS(path string) (HistoryArchiveState, error) {
return pa.GetAnyArchive().GetPathHAS(path)
// runRoundRobin is a helper method that will run a particular action on every
// archive in the pool until it succeeds or the pool is exhausted (whichever
// comes first), repeating with a constant 500ms backoff.
func (pa *ArchivePool) runRoundRobin(runner func(ai ArchiveInterface) error) error {
return backoff.Retry(func() error {
var err error
ai := pa.getNextArchive()
if err = runner(ai); err == nil {
return nil
}

if errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) {
return backoff.Permanent(err)
}

// Intentionally avoid logging context errors
if stats := ai.GetStats(); len(stats) > 0 {
log.WithField("error", err).Warnf(
"Encountered an error with archive '%s'",
stats[0].GetBackendName())
}

return err
}, pa.backoff)
}

//
// Below are the ArchiveInterface method implementations.
//

func (pa *ArchivePool) GetPathHAS(path string) (HistoryArchiveState, error) {
has := HistoryArchiveState{}
err := pa.runRoundRobin(func(ai ArchiveInterface) error {
var innerErr error
has, innerErr = ai.GetPathHAS(path)
return innerErr
})
return has, err
}

func (pa ArchivePool) PutPathHAS(path string, has HistoryArchiveState, opts *CommandOptions) error {
return pa.GetAnyArchive().PutPathHAS(path, has, opts)
func (pa *ArchivePool) PutPathHAS(path string, has HistoryArchiveState, opts *CommandOptions) error {
return pa.runRoundRobin(func(ai ArchiveInterface) error {
return ai.PutPathHAS(path, has, opts)
})
}

func (pa ArchivePool) BucketExists(bucket Hash) (bool, error) {
return pa.GetAnyArchive().BucketExists(bucket)
func (pa *ArchivePool) BucketExists(bucket Hash) (bool, error) {
status := false
return status, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
status, err = ai.BucketExists(bucket)
return err
})
}

func (pa ArchivePool) BucketSize(bucket Hash) (int64, error) {
return pa.GetAnyArchive().BucketSize(bucket)
func (pa *ArchivePool) BucketSize(bucket Hash) (int64, error) {
var bsize int64
return bsize, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
bsize, err = ai.BucketSize(bucket)
return err
})
}

func (pa ArchivePool) CategoryCheckpointExists(cat string, chk uint32) (bool, error) {
return pa.GetAnyArchive().CategoryCheckpointExists(cat, chk)
func (pa *ArchivePool) CategoryCheckpointExists(cat string, chk uint32) (bool, error) {
var ok bool
return ok, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
ok, err = ai.CategoryCheckpointExists(cat, chk)
return err
})
}

func (pa ArchivePool) GetLedgerHeader(chk uint32) (xdr.LedgerHeaderHistoryEntry, error) {
return pa.GetAnyArchive().GetLedgerHeader(chk)
func (pa *ArchivePool) GetLedgerHeader(chk uint32) (xdr.LedgerHeaderHistoryEntry, error) {
var entry xdr.LedgerHeaderHistoryEntry
return entry, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
entry, err = ai.GetLedgerHeader(chk)
return err
})
}

func (pa ArchivePool) GetRootHAS() (HistoryArchiveState, error) {
return pa.GetAnyArchive().GetRootHAS()
func (pa *ArchivePool) GetRootHAS() (HistoryArchiveState, error) {
var state HistoryArchiveState
return state, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
state, err = ai.GetRootHAS()
return err
})
}

func (pa ArchivePool) GetLedgers(start, end uint32) (map[uint32]*Ledger, error) {
return pa.GetAnyArchive().GetLedgers(start, end)
func (pa *ArchivePool) GetLedgers(start, end uint32) (map[uint32]*Ledger, error) {
var dict map[uint32]*Ledger

return dict, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
dict, err = ai.GetLedgers(start, end)
return err
})
}

func (pa ArchivePool) GetCheckpointHAS(chk uint32) (HistoryArchiveState, error) {
return pa.GetAnyArchive().GetCheckpointHAS(chk)
func (pa *ArchivePool) GetCheckpointHAS(chk uint32) (HistoryArchiveState, error) {
var state HistoryArchiveState
return state, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
state, err = ai.GetCheckpointHAS(chk)
return err
})
}

func (pa ArchivePool) PutCheckpointHAS(chk uint32, has HistoryArchiveState, opts *CommandOptions) error {
return pa.GetAnyArchive().PutCheckpointHAS(chk, has, opts)
func (pa *ArchivePool) PutCheckpointHAS(chk uint32, has HistoryArchiveState, opts *CommandOptions) error {
return pa.runRoundRobin(func(ai ArchiveInterface) error {
return ai.PutCheckpointHAS(chk, has, opts)
})
}

func (pa ArchivePool) PutRootHAS(has HistoryArchiveState, opts *CommandOptions) error {
return pa.GetAnyArchive().PutRootHAS(has, opts)
func (pa *ArchivePool) PutRootHAS(has HistoryArchiveState, opts *CommandOptions) error {
return pa.runRoundRobin(func(ai ArchiveInterface) error {
return ai.PutRootHAS(has, opts)
})
}

func (pa ArchivePool) ListBucket(dp DirPrefix) (chan string, chan error) {
return pa.GetAnyArchive().ListBucket(dp)
func (pa *ArchivePool) GetXdrStreamForHash(hash Hash) (*XdrStream, error) {
var stream *XdrStream
return stream, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
stream, err = ai.GetXdrStreamForHash(hash)
return err
})
}

func (pa ArchivePool) ListAllBuckets() (chan string, chan error) {
return pa.GetAnyArchive().ListAllBuckets()
func (pa *ArchivePool) GetXdrStream(pth string) (*XdrStream, error) {
var stream *XdrStream
return stream, pa.runRoundRobin(func(ai ArchiveInterface) error {
var err error
stream, err = ai.GetXdrStream(pth)
return err
})
}

func (pa ArchivePool) ListAllBucketHashes() (chan Hash, chan error) {
return pa.GetAnyArchive().ListAllBucketHashes()
func (pa *ArchivePool) GetCheckpointManager() CheckpointManager {
return pa.getNextArchive().GetCheckpointManager()
}

func (pa ArchivePool) ListCategoryCheckpoints(cat string, pth string) (chan uint32, chan error) {
return pa.GetAnyArchive().ListCategoryCheckpoints(cat, pth)
//
// The channel-based methods do not have automatic retries.
//

func (pa *ArchivePool) ListBucket(dp DirPrefix) (chan string, chan error) {
return pa.getNextArchive().ListBucket(dp)
}

func (pa ArchivePool) GetXdrStreamForHash(hash Hash) (*XdrStream, error) {
return pa.GetAnyArchive().GetXdrStreamForHash(hash)
func (pa *ArchivePool) ListAllBuckets() (chan string, chan error) {
return pa.getNextArchive().ListAllBuckets()
}

func (pa ArchivePool) GetXdrStream(pth string) (*XdrStream, error) {
return pa.GetAnyArchive().GetXdrStream(pth)
func (pa *ArchivePool) ListAllBucketHashes() (chan Hash, chan error) {
return pa.getNextArchive().ListAllBucketHashes()
}

func (pa ArchivePool) GetCheckpointManager() CheckpointManager {
return pa.GetAnyArchive().GetCheckpointManager()
func (pa *ArchivePool) ListCategoryCheckpoints(cat string, pth string) (chan uint32, chan error) {
return pa.getNextArchive().ListCategoryCheckpoints(cat, pth)
}
Loading

0 comments on commit ab3a926

Please sign in to comment.