Skip to content

Commit

Permalink
makesync: add checksum option, change mb -> kb, store cli version (#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdon authored Jan 6, 2024
1 parent 06b5729 commit 80f0517
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 58 deletions.
9 changes: 5 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ var cli struct {
} `cmd:"" help:"Verify the correctness of an archive structure, without verifying individual tile contents."`

Makesync struct {
Input string `arg:"" type:"existingfile"`
BlockSizeMegabytes int `default:1 help:"The approximate block size, in megabytes. 0 means 1 tile = 1 block."`
HashFunction string `default:fnv1a help:"The hash function."`
Input string `arg:"" type:"existingfile"`
BlockSizeKb int `default:1000 help:"The approximate block size, in kilobytes. 0 means 1 tile = 1 block."`
HashFunction string `default:fnv1a help:"The hash function."`
Checksum string `help:"Store a checksum in the syncfile."`
} `cmd:"" hidden:""`

Sync struct {
Expand Down Expand Up @@ -193,7 +194,7 @@ func main() {
logger.Fatalf("Failed to verify archive, %v", err)
}
case "makesync <input>":
err := pmtiles.Makesync(logger, cli.Makesync.Input, cli.Makesync.BlockSizeMegabytes)
err := pmtiles.Makesync(logger, version, cli.Makesync.Input, cli.Makesync.BlockSizeKb, cli.Makesync.Checksum)
if err != nil {
logger.Fatalf("Failed to makesync archive, %v", err)
}
Expand Down
118 changes: 64 additions & 54 deletions pmtiles/makesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"context"
"crypto/md5"
"fmt"
"github.com/dustin/go-humanize"
"github.com/schollz/progressbar/v3"
Expand All @@ -13,8 +14,10 @@ import (
"log"
"os"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
)

Expand All @@ -30,12 +33,18 @@ type Result struct {
Hash uint64
}

func Makesync(logger *log.Logger, file string, block_size_megabytes int) error {
type Syncline struct {
Offset uint64
Length uint64
Hash uint64
}

func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb int, checksum string) error {
ctx := context.Background()
start := time.Now()

bucketURL, key, err := NormalizeBucketKey("", "", file)
block_size_bytes := uint64(1024 * 1024 * block_size_megabytes)
block_size_bytes := uint64(1000 * block_size_kb)

if err != nil {
return err
Expand Down Expand Up @@ -93,6 +102,23 @@ func Makesync(logger *log.Logger, file string, block_size_megabytes int) error {
panic(err)
}
defer output.Close()
output.Write([]byte(fmt.Sprintf("version=%s\n", cli_version)))

if checksum == "md5" {
localfile, err := os.Open(file)
if err != nil {
panic(err)
}
defer localfile.Close()
reader := bufio.NewReaderSize(localfile, 64*1024*1024)
md5hasher := md5.New()
if _, err := io.Copy(md5hasher, reader); err != nil {
panic(err)
}
md5checksum := md5hasher.Sum(nil)
fmt.Printf("Completed md5 in %v.\n", time.Since(start))
output.Write([]byte(fmt.Sprintf("md5=%x\n", md5checksum)))
}

output.Write([]byte("hash=fnv1a\n"))
output.Write([]byte(fmt.Sprintf("blocksize=%d\n", block_size_bytes)))
Expand All @@ -104,63 +130,41 @@ func Makesync(logger *log.Logger, file string, block_size_megabytes int) error {

var current Block

GetHash := func(offset uint64, length uint64) uint64 {
hasher := fnv.New64a()
r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+offset), int64(length))
if err != nil {
log.Fatal(err)
}
tasks := make(chan Block, 1000)

if _, err := io.Copy(hasher, r); err != nil {
log.Fatal(err)
}
r.Close()
return hasher.Sum64()
}
var wg sync.WaitGroup
var mu sync.Mutex

tasks := make(chan Block, 10000)
intermediate := make(chan Result, 10000)
synclines := make(map[uint64]Syncline)

errs, _ := errgroup.WithContext(ctx)
// workers
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
errs.Go(func() error {
wg.Add(1)
hasher := fnv.New64a()
for block := range tasks {
intermediate <- Result{block, GetHash(block.Offset, block.Length)}
}
return nil
})
}

done := make(chan struct{})

go func() {
buffer := make(map[uint64]Result)
nextIndex := uint64(0)

for i := range intermediate {
buffer[i.Block.Index] = i

for {
if next, ok := buffer[nextIndex]; ok {

output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x\n", next.Block.Start, next.Block.Offset, next.Block.Length, next.Hash)))
r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+block.Offset), int64(block.Length))
if err != nil {
log.Fatal(err)
}

delete(buffer, nextIndex)
nextIndex++
if _, err := io.Copy(hasher, r); err != nil {
log.Fatal(err)
}
r.Close()

if next.Block.Offset+next.Block.Length == header.TileDataLength {
close(intermediate)
}
sum64 := hasher.Sum64()
mu.Lock()
synclines[block.Start] = Syncline{block.Offset, block.Length, sum64}
mu.Unlock()

} else {
break
}
hasher.Reset()
}
}

done <- struct{}{}
}()
wg.Done()
return nil
})
}

current_index := uint64(0)

Expand Down Expand Up @@ -196,18 +200,24 @@ func Makesync(logger *log.Logger, file string, block_size_megabytes int) error {
blocks += 1
close(tasks)

<-done
wg.Wait()

var keys []uint64
for k := range synclines {
keys = append(keys, k)
}
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })

for _, k := range keys {
syncline := synclines[k]
output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x\n", k, syncline.Offset, syncline.Length, syncline.Hash)))
}

fmt.Printf("Created syncfile with %d blocks.\n", blocks)
fmt.Printf("Completed makesync in %v.\n", time.Since(start))
return nil
}

type Syncline struct {
Offset uint64
Length uint64
Hash uint64
}

func Sync(logger *log.Logger, file string, syncfile string) error {
start := time.Now()
total_remote_bytes := uint64(0)
Expand Down

0 comments on commit 80f0517

Please sign in to comment.