Skip to content

Commit

Permalink
fix: resume partial downloads and better handling for existing index …
Browse files Browse the repository at this point in the history
…file
  • Loading branch information
crebsy committed Jul 10, 2024
1 parent 4b11a6a commit 08c3590
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 48 deletions.
77 changes: 56 additions & 21 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
Expand Down Expand Up @@ -72,9 +73,11 @@ func main() {
panic(err)
}

// get url for index
indexFileName := downloadIndexFile()
fmt.Println(indexFileName)
indexFileName := filepath.Join(OUT_DIR, ".moonsnap_index")
_, err = os.Stat(indexFileName)
if err != nil && errors.Is(err, os.ErrNotExist) {
downloadIndexFile(indexFileName)
}

file, err := os.Open(indexFileName)
if err != nil {
Expand Down Expand Up @@ -165,14 +168,24 @@ func main() {
verifyFiles(&index, OUT_DIR)
}

func downloadIndexFile() string {
func downloadIndexFile(indexFileName string) string {
retries := 0
bar := progressbar.Default(int64(100), "downloading index file")
retry_index:
snapUrlCreds := getSnapUrlCreds("")
snapUrlCreds, err := getSnapUrlCreds("")
if err != nil {
retries += 1
if retries <= MAX_RETRIES {
time.Sleep(1 * time.Second)
goto retry_index
}
}
client := http.Client{}
u, err := url.Parse(snapUrlCreds.Url)
if err != nil {
panic(err)
}
bar.Add(25)
res, err := client.Do(&http.Request{
Method: "GET",
Header: http.Header{
Expand All @@ -181,11 +194,16 @@ retry_index:
URL: u,
})
if err != nil {
retries += 1
if retries <= MAX_RETRIES {
time.Sleep(1 * time.Second)
goto retry_index
}
panic(err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
fmt.Printf("%+v\n", snapUrlCreds)
//fmt.Printf("%+v\n", snapUrlCreds)
dfn := "/tmp/index.failed"
df, _ := os.Create(dfn)
defer df.Close()
Expand All @@ -194,10 +212,16 @@ retry_index:
panic(err)
}

fmt.Printf("Bad status while downloading index: %s, dumped to %s\n", res.Status, dfn)
goto retry_index
//fmt.Printf("Bad status while downloading index: %s, dumped to %s\n", res.Status, dfn)
retries += 1
if retries <= MAX_RETRIES {
time.Sleep(1 * time.Second)
goto retry_index
}
}
f, err := os.Create("/tmp/index")
bar.Add(50)

f, err := os.Create(indexFileName)
if err != nil {
panic(err)
}
Expand All @@ -206,14 +230,17 @@ retry_index:
if err != nil {
panic(err)
}
bar.Add(25)
bar.Close()

return f.Name()
}

func getSnapUrlCreds(fileName string) SnapUrlResponse {
func getSnapUrlCreds(fileName string) (*SnapUrlResponse, error) {
client := http.Client{}
u, err := url.Parse(API_BASE_URL)
if err != nil {
panic(err)
return nil, err
}
values := u.Query()
values.Set("snapKey", SNAP_KEY)
Expand All @@ -226,20 +253,20 @@ func getSnapUrlCreds(fileName string) SnapUrlResponse {
URL: u,
})
if err != nil {
panic(err)
return nil, err
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
panic(err)
return nil, err
}

var snapUrlResponse SnapUrlResponse
snapUrlResponse := SnapUrlResponse{}
err = json.Unmarshal(body, &snapUrlResponse)
if err != nil {
panic(err)
return nil, err
}
return snapUrlResponse
return &snapUrlResponse, nil
}

func verifyFilesWorker(fileChan <-chan *moonproto.Index_File, bar *progressbar.ProgressBar, outDir string, wg *sync.WaitGroup) {
Expand Down Expand Up @@ -330,7 +357,15 @@ func downloadoor(index *moonproto.Index, resumeCtx *ResumeCtx, chunkChan chan<-

retry:
// get url for lib
libUrlCreds := getSnapUrlCreds(libName)
libUrlCreds, err := getSnapUrlCreds(libName)
if err != nil {
retries += 1
if retries <= MAX_RETRIES {
time.Sleep(1 * time.Second)
goto retry
}
panic(err)
}
u, err := url.Parse(libUrlCreds.Url)
if err != nil {
retries += 1
Expand Down Expand Up @@ -429,10 +464,10 @@ func downloadoor(index *moonproto.Index, resumeCtx *ResumeCtx, chunkChan chan<-
func createFileStructure(index *moonproto.Index, outDir string) int {
totalBytes := 0
for _, file := range index.Files {
fmt.Println(file)
//fmt.Println(file)
totalBytes += int(file.FileSize)
if file.FileMode&uint64(fs.ModeSymlink) > 0 {
fmt.Println(file.FileLinkTarget)
//fmt.Println(file.FileLinkTarget)

newLinkTarget, err := filepath.Abs(path.Join(outDir, *file.FileLinkTarget))
if err != nil {
Expand All @@ -451,9 +486,9 @@ func createFileStructure(index *moonproto.Index, outDir string) int {

//TODO handle 0-byte files (e.g. LOCK)
if file.FileSize > 0 {
err = unix.Fallocate(int(f.Fd()), 0, 0, int64(file.FileSize))
err := unix.Fallocate(int(f.Fd()), 0, 0, int64(file.FileSize))
if err != nil {
fmt.Printf("path=%s, size=%d\n", file.FilePath, file.FileSize)
fmt.Printf("Error with: path=%s, size=%d\n", file.FilePath, file.FileSize)
panic(err)
}
}
Expand Down
46 changes: 19 additions & 27 deletions resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"bytes"
"errors"
"fmt"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -40,7 +39,7 @@ func (b Bitset) Get(idx int) bool {
}

func LoadResumeFile() *ResumeCtx {
resumeFileName := filepath.Join(OUT_DIR, ".moonsnap.resume")
resumeFileName := filepath.Join(OUT_DIR, ".moonsnap_resume")
_, err := os.Stat(resumeFileName)
var resumeFile *os.File
if err != nil && errors.Is(err, os.ErrNotExist) {
Expand Down Expand Up @@ -149,14 +148,10 @@ func (r *ResumeCtx) setChunkDone(rangeIdx int, chunkIdx int) {
r.lock.Lock()
defer r.lock.Unlock()

entry := r.rangeEntries[rangeIdx]
/*if !ok {
for rIdx := range r.rangeEntries {
fmt.Println(rIdx)
}
panic("wooooooo")
}*/
entry, ok := r.rangeEntries[rangeIdx]
if !ok {
return
}
if !entry.chunkBitset.Get(chunkIdx) {
entry.chunkBitset.Set(chunkIdx)
entry.done++
Expand Down Expand Up @@ -189,33 +184,30 @@ func (r *ResumeCtx) persist() {
panic(err)
}

/*
dropRangeIndexes := []int{}
for rangeIdx, entry := range r.rangeEntries {
r.resumeFile.Seek(entry.fileOffset, io.SeekStart)
_, err := io.Copy(r.resumeFile, bytes.NewReader(entry.chunkBitset))
if err != nil {
panic(err)
}
/*if entry.done >= entry.length {
dropRangeIndexes = append(dropRangeIndexes, rangeIdx)
}
dropRangeIndexes := []int{}
for rangeIdx, entry := range r.rangeEntries {
r.resumeFile.Seek(entry.fileOffset, io.SeekStart)
_, err := io.Copy(r.resumeFile, bytes.NewReader(entry.chunkBitset))
if err != nil {
panic(err)
}

for rangeIdx := range dropRangeIndexes {
fmt.Printf("Dropping rangeIdx %d\n", rangeIdx)
delete(r.rangeEntries, rangeIdx)
if entry.done >= entry.length {
dropRangeIndexes = append(dropRangeIndexes, rangeIdx)
}
*/
}

for rangeIdx := range dropRangeIndexes {
//fmt.Printf("Dropping rangeIdx %d\n", rangeIdx)
delete(r.rangeEntries, rangeIdx)
}

r.resumeFile.Seek(offset, io.SeekStart)
}

func (r *ResumeCtx) close() {
r.persist()

fmt.Println("close()")
err := r.resumeFile.Close()
if err != nil {
panic(err)
Expand Down

0 comments on commit 08c3590

Please sign in to comment.