Skip to content

Commit

Permalink
Merge pull request #2 from rpcpool/serial_download
Browse files Browse the repository at this point in the history
Conduct serial downloading & add support for fetching specific snapshot
  • Loading branch information
linuskendall authored Jun 6, 2024
2 parents 5fdd1d0 + 5eb9fa3 commit caeefce
Show file tree
Hide file tree
Showing 20 changed files with 314 additions and 144 deletions.
25 changes: 0 additions & 25 deletions .github/workflows/golangci_lint.yml

This file was deleted.

17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,23 @@ Flags:
--tracker string URL to tracker API
```

## Snapshot fetching logic

### If full=true, incremental=true (default)

If the local ledger directory does not have the same full snapshot as the remote one, then fetch the full snapshot otherwise skip full snapshot fetching.

If the local ledger does not have the latest incremental snapshot, fetch the latest incremnetal.

### If full=false, incremental=true

Fetch the latest incremental snapshot for the latest full snapshot locally available. Warn if this local snapshot is not the latest but do not fetch.

### If full=true, incremental=false

Fetch the latest full snapshot, unless it is already available in the local ledger and has the same hash.


## Architecture

### Snapshot management
Expand Down
244 changes: 162 additions & 82 deletions internal/cmd/fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"net/http"
"os"
"os/signal"
"sort"
"time"

"github.com/spf13/cobra"
Expand All @@ -31,8 +30,10 @@ import (
"go.blockdaemon.com/solana/cluster-manager/internal/fetch"
"go.blockdaemon.com/solana/cluster-manager/internal/ledger"
"go.blockdaemon.com/solana/cluster-manager/internal/logger"
"go.blockdaemon.com/solana/cluster-manager/types"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/term"
"gopkg.in/resty.v1"
)

Expand All @@ -50,9 +51,11 @@ var (
trackerURL string
minSnapAge uint64
maxSnapAge uint64
baseSlot uint64
fullSnap bool
incrementalSnap bool
requestTimeout time.Duration
downloadTimeout time.Duration
parallelDownload bool
)

func init() {
Expand All @@ -63,12 +66,18 @@ func init() {
flags.Uint64Var(&maxSnapAge, "max-slots", 10000, "Refuse to download <n> slots older than the newest")
flags.DurationVar(&requestTimeout, "request-timeout", 3*time.Second, "Max time to wait for headers (excluding download)")
flags.DurationVar(&downloadTimeout, "download-timeout", 10*time.Minute, "Max time to try downloading in total")
flags.BoolVar(&parallelDownload, "parallel-download", false, "Download snapshot files in parallel or serially")
flags.Uint64Var(&baseSlot, "slot", 0, "Download snapshot for given slot (if available)")
flags.BoolVar(&fullSnap, "full", true, "Download full snapshot (if available)")
flags.BoolVar(&incrementalSnap, "incremental", true, "Download incremental snapshot (if available)")
}

func run() {
log := logger.GetLogger()

if !fullSnap && !incrementalSnap {
log.Fatal("Must specify at least one of --full or --incremental")
}

// Regardless which API we talk to, we want to cap time from request to response header.
// This defends against black holes and really slow servers.
// Download time (reading response body) is not affected.
Expand All @@ -87,101 +96,172 @@ func run() {
log.Fatal("Failed to check existing snapshots", zap.Error(err))
}

// Ask tracker for best snapshots.
// Get a specific snapshot or "best snapshot" from tracker client
trackerClient := fetch.NewTrackerClientWithResty(
resty.New().
SetHostURL(trackerURL).
SetTimeout(requestTimeout),
)
remoteSnaps, err := trackerClient.GetBestSnapshots(ctx, -1)
if err != nil {
log.Fatal("Failed to request snapshot info", zap.Error(err))
}

// Decide what we want to do.
_, advice := fetch.ShouldFetchSnapshot(localSnaps, remoteSnaps, minSnapAge, maxSnapAge)
switch advice {
case fetch.AdviceNothingFound:
log.Error("No snapshots available remotely")
return
case fetch.AdviceUpToDate:
log.Info("Existing snapshot is recent enough, no download needed",
zap.Uint64("existing_slot", localSnaps[0].Slot))
return
case fetch.AdviceFetch:
var remoteSnaps []types.SnapshotSource

if baseSlot != 0 {
log.Info("Fetching snapshots at slot", zap.Uint64("base_slot", baseSlot))

// Ask tracker for snapshots at a specific location
remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, baseSlot)
if err != nil {
log.Fatal("Failed to fetch snapshot info", zap.Error(err))
}

// @TODO check if this snapshot already exists
buf, _ := json.MarshalIndent(remoteSnaps, "", "\t")
log.Info("Downloading a snapshot", zap.ByteString("snap", buf))

} else {
log.Info("Finding best snapshot")

// Ask tracker for best snapshots.
remoteSnaps, err = trackerClient.GetBestSnapshots(ctx, -1)
if err != nil {
log.Fatal("Failed to request snapshot info", zap.Error(err))
}

// Decide what we want to do.
_, advice := fetch.ShouldFetchSnapshot(localSnaps, remoteSnaps, minSnapAge, maxSnapAge)
switch advice {
case fetch.AdviceNothingFound:
log.Error("No snapshots available remotely")
return
case fetch.AdviceUpToDate:
log.Info("Existing snapshot is recent enough, no download needed",
zap.Uint64("existing_slot", localSnaps[0].Slot))
return
case fetch.AdviceRemoteIsOlder:
log.Info("Remote snapshot is older than local, no download possible")
return
case fetch.AdviceFetchFull:
if !fullSnap {
// If we are not fetching a full snapshot and the base slot isn't matching
// we need to fetch an older incremental snapshot.
log.Info("Full snapshot is newer than local, but not requested")
remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, localSnaps[0].BaseSlot)
if err != nil {
log.Fatal("Failed to request snapshot info", zap.Error(err))
}
}
case fetch.AdviceFetchIncremental:
log.Info("Full snapshot already found. No need for downloading.")
fullSnap = false // No need to fetch full snap
if !incrementalSnap {
log.Info("Incremental snapshot is newer than local, but only full is requested.")
return
}
case fetch.AdviceFetch:
}
}

// Print snapshot to user.
log.Info("Number of remote snaps found: ", zap.Int("num", len(remoteSnaps)))
if len(remoteSnaps) == 0 {
log.Fatal("Could not find any matching snapshots. Bailing.")
}

snap := &remoteSnaps[0]
buf, _ := json.MarshalIndent(snap, "", "\t")
log.Info("Downloading a snapshot", zap.ByteString("snap", buf), zap.String("target", snap.Target))

// Setup progress bars for download.
bars := mpb.New()
sidecarClient := fetch.NewSidecarClientWithOpts(snap.Target, fetch.SidecarClientOpts{
ProxyReaderFunc: func(name string, size int64, rd io.Reader) io.ReadCloser {
bar := bars.New(
size,
mpb.BarStyle(),
mpb.PrependDecorators(decor.Name(name)),
mpb.AppendDecorators(
decor.AverageSpeed(decor.UnitKB, "% .1f"),
decor.Percentage(),
),
)
return bar.ProxyReader(rd)
},
})

// Download.
var fetchOpts fetch.SidecarClientOpts
if term.IsTerminal(int(os.Stdout.Fd())) {
bars := mpb.New()
fetchOpts = fetch.SidecarClientOpts{
ProxyReaderFunc: func(name string, size int64, rd io.Reader) io.ReadCloser {
bar := bars.New(
size,
mpb.BarStyle(),
mpb.PrependDecorators(decor.Name(name)),
mpb.AppendDecorators(
decor.AverageSpeed(decor.UnitKB, "% .1f"),
decor.Percentage(),
),
)
return bar.ProxyReader(rd)
},
}
}

sidecarClient := fetch.NewSidecarClientWithOpts(snap.Target, fetchOpts)

// First pass, if we're fetching fullSnap we want to fetch the fullSnaps but **not** the incremental snap
// Then after completion of the full snap download, we refetch the incremental one so we get the latest one
if fullSnap {
buf, _ := json.MarshalIndent(snap, "", "\t")
log.Info("Downloading full snapshot", zap.ByteString("snap", buf), zap.String("target", snap.Target))

downloadSnapshot(ctx, sidecarClient, snap, true, false)

if incrementalSnap {

// If we were downloading a full snapshot, check if there's a newer incremental snapshot we can fetch
// Find latest incremental snapshot
log.Info("Finding incremental snapshot for full slot", zap.Uint64("base_slot", snap.BaseSlot))
remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, snap.BaseSlot)
if err != nil {
log.Fatal("Failed to request snapshot info", zap.Error(err))
}

if len(remoteSnaps) == 0 {
log.Fatal("No incremental snapshot found")
}

snap = &remoteSnaps[0]
} else {
log.Info("Only full snapshot was requested, not fetching incremental snapshot")
return
}
}

if incrementalSnap {
// Download incremental snapshot
buf, _ := json.MarshalIndent(snap, "", "\t")
log.Info("Downloading incremental snapshot", zap.ByteString("snap", buf), zap.String("target", snap.Target))

// This will fetch the latest incremental snapshot (if fullSnap was specified it would already have been fetched and refreshed)
downloadSnapshot(ctx, sidecarClient, snap, false, true)
}
}

func downloadSnapshot(ctx context.Context, sidecarClient *fetch.SidecarClient, snap *types.SnapshotSource, full bool, incremental bool) {
log := logger.GetLogger()

// Download the snapshot files
beforeDownload := time.Now()
var downloadErr error

if parallelDownload {
group, ctx := errgroup.WithContext(ctx)

for _, file := range snap.Files {
file_ := file
if parallelDownload {
group.Go(func() error {
err := sidecarClient.DownloadSnapshotFile(ctx, ".", file_.FileName)
if err != nil {
log.Error("Download failed",
zap.String("snapshot", file_.FileName),
zap.String("error", err.Error()))
}
return err
})
}

}

downloadErr = group.Wait()
} else {
// Download the largest files first
sort.Slice(snap.Files, func(i, j int) bool {
return snap.Files[i].Size > snap.Files[j].Size
})

for _, file := range snap.Files {
file_ := file

err := sidecarClient.DownloadSnapshotFile(ctx, ".", file_.FileName)
if err != nil {
log.Error("Download failed",
zap.String("snapshot", file_.FileName),
zap.String("error", err.Error()))
downloadErr = err
break // @TODO should we still try to download the other stuff?
}
}
}

downloadDuration := time.Since(beforeDownload)
group, ctx := errgroup.WithContext(ctx)
for _, file := range snap.Files {
if file.BaseSlot != 0 && !incremental {
continue
}
if file.BaseSlot == 0 && !full {
continue
}

file_ := file
group.Go(func() error {
err := sidecarClient.DownloadSnapshotFile(ctx, ".", file_.FileName)
if err != nil {
log.Error("Full snapshot download failed",
zap.String("snapshot", file_.FileName),
zap.String("error", err.Error()))
}
return err
})
}
downloadErr := group.Wait()
downloadDuration := time.Since(beforeDownload)

if downloadErr == nil {
log.Info("Download completed", zap.Duration("download_time", downloadDuration))
log.Info("Snapshot download completed", zap.Duration("download_time", downloadDuration))
} else {
log.Fatal("Aborting download", zap.Duration("download_time", downloadDuration))
}

}
2 changes: 1 addition & 1 deletion internal/cmd/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var (
configPath string
internalListen string
listen string
rpc string
rpc string
maxSnapshotAge uint64
)

Expand Down
Loading

0 comments on commit caeefce

Please sign in to comment.