From 80f0517e0063bc656b99e7e243b5bd3c0361d42b Mon Sep 17 00:00:00 2001 From: Brandon Liu Date: Sat, 6 Jan 2024 21:36:22 +0800 Subject: [PATCH] makesync: add checksum option, change mb -> kb, store cli version (#111) --- main.go | 9 ++-- pmtiles/makesync.go | 118 ++++++++++++++++++++++++-------------------- 2 files changed, 69 insertions(+), 58 deletions(-) diff --git a/main.go b/main.go index 936a47f..d15c2ed 100644 --- a/main.go +++ b/main.go @@ -62,9 +62,10 @@ var cli struct { } `cmd:"" help:"Verify the correctness of an archive structure, without verifying individual tile contents."` Makesync struct { - Input string `arg:"" type:"existingfile"` - BlockSizeMegabytes int `default:1 help:"The approximate block size, in megabytes. 0 means 1 tile = 1 block."` - HashFunction string `default:fnv1a help:"The hash function."` + Input string `arg:"" type:"existingfile"` + BlockSizeKb int `default:1000 help:"The approximate block size, in kilobytes. 0 means 1 tile = 1 block."` + HashFunction string `default:fnv1a help:"The hash function."` + Checksum string `help:"Store a checksum in the syncfile."` } `cmd:"" hidden:""` Sync struct { @@ -193,7 +194,7 @@ func main() { logger.Fatalf("Failed to verify archive, %v", err) } case "makesync ": - err := pmtiles.Makesync(logger, cli.Makesync.Input, cli.Makesync.BlockSizeMegabytes) + err := pmtiles.Makesync(logger, version, cli.Makesync.Input, cli.Makesync.BlockSizeKb, cli.Makesync.Checksum) if err != nil { logger.Fatalf("Failed to makesync archive, %v", err) } diff --git a/pmtiles/makesync.go b/pmtiles/makesync.go index 4c09dd1..7dd8fc7 100644 --- a/pmtiles/makesync.go +++ b/pmtiles/makesync.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "context" + "crypto/md5" "fmt" "github.com/dustin/go-humanize" "github.com/schollz/progressbar/v3" @@ -13,8 +14,10 @@ import ( "log" "os" "runtime" + "sort" "strconv" "strings" + "sync" "time" ) @@ -30,12 +33,18 @@ type Result struct { Hash uint64 } -func Makesync(logger *log.Logger, file string, block_size_megabytes int) error { +type Syncline struct { + Offset uint64 + Length uint64 + Hash uint64 +} + +func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb int, checksum string) error { ctx := context.Background() start := time.Now() bucketURL, key, err := NormalizeBucketKey("", "", file) - block_size_bytes := uint64(1024 * 1024 * block_size_megabytes) + block_size_bytes := uint64(1000 * block_size_kb) if err != nil { return err @@ -93,6 +102,23 @@ func Makesync(logger *log.Logger, file string, block_size_megabytes int) error { panic(err) } defer output.Close() + output.Write([]byte(fmt.Sprintf("version=%s\n", cli_version))) + + if checksum == "md5" { + localfile, err := os.Open(file) + if err != nil { + panic(err) + } + defer localfile.Close() + reader := bufio.NewReaderSize(localfile, 64*1024*1024) + md5hasher := md5.New() + if _, err := io.Copy(md5hasher, reader); err != nil { + panic(err) + } + md5checksum := md5hasher.Sum(nil) + fmt.Printf("Completed md5 in %v.\n", time.Since(start)) + output.Write([]byte(fmt.Sprintf("md5=%x\n", md5checksum))) + } output.Write([]byte("hash=fnv1a\n")) output.Write([]byte(fmt.Sprintf("blocksize=%d\n", block_size_bytes))) @@ -104,63 +130,41 @@ func Makesync(logger *log.Logger, file string, block_size_megabytes int) error { var current Block - GetHash := func(offset uint64, length uint64) uint64 { - hasher := fnv.New64a() - r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+offset), int64(length)) - if err != nil { - log.Fatal(err) - } + tasks := make(chan Block, 1000) - if _, err := io.Copy(hasher, r); err != nil { - log.Fatal(err) - } - r.Close() - return hasher.Sum64() - } + var wg sync.WaitGroup + var mu sync.Mutex - tasks := make(chan Block, 10000) - intermediate := make(chan Result, 10000) + synclines := make(map[uint64]Syncline) errs, _ := errgroup.WithContext(ctx) // workers for i := 0; i < runtime.GOMAXPROCS(0); i++ { errs.Go(func() error { + wg.Add(1) + hasher := fnv.New64a() for block := range tasks { - intermediate <- Result{block, GetHash(block.Offset, block.Length)} - } - return nil - }) - } - - done := make(chan struct{}) - - go func() { - buffer := make(map[uint64]Result) - nextIndex := uint64(0) - - for i := range intermediate { - buffer[i.Block.Index] = i - - for { - if next, ok := buffer[nextIndex]; ok { - - output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x\n", next.Block.Start, next.Block.Offset, next.Block.Length, next.Hash))) + r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+block.Offset), int64(block.Length)) + if err != nil { + log.Fatal(err) + } - delete(buffer, nextIndex) - nextIndex++ + if _, err := io.Copy(hasher, r); err != nil { + log.Fatal(err) + } + r.Close() - if next.Block.Offset+next.Block.Length == header.TileDataLength { - close(intermediate) - } + sum64 := hasher.Sum64() + mu.Lock() + synclines[block.Start] = Syncline{block.Offset, block.Length, sum64} + mu.Unlock() - } else { - break - } + hasher.Reset() } - } - - done <- struct{}{} - }() + wg.Done() + return nil + }) + } current_index := uint64(0) @@ -196,18 +200,24 @@ func Makesync(logger *log.Logger, file string, block_size_megabytes int) error { blocks += 1 close(tasks) - <-done + wg.Wait() + + var keys []uint64 + for k := range synclines { + keys = append(keys, k) + } + sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + + for _, k := range keys { + syncline := synclines[k] + output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x\n", k, syncline.Offset, syncline.Length, syncline.Hash))) + } + fmt.Printf("Created syncfile with %d blocks.\n", blocks) fmt.Printf("Completed makesync in %v.\n", time.Since(start)) return nil } -type Syncline struct { - Offset uint64 - Length uint64 - Hash uint64 -} - func Sync(logger *log.Logger, file string, syncfile string) error { start := time.Now() total_remote_bytes := uint64(0)