Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

makesync: add checksum option, change mb -> kb, store cli version #111

Merged
merged 1 commit into from
Jan 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ var cli struct {
} `cmd:"" help:"Verify the correctness of an archive structure, without verifying individual tile contents."`

Makesync struct {
Input string `arg:"" type:"existingfile"`
BlockSizeMegabytes int `default:1 help:"The approximate block size, in megabytes. 0 means 1 tile = 1 block."`
HashFunction string `default:fnv1a help:"The hash function."`
Input string `arg:"" type:"existingfile"`
BlockSizeKb int `default:1000 help:"The approximate block size, in kilobytes. 0 means 1 tile = 1 block."`
HashFunction string `default:fnv1a help:"The hash function."`
Checksum string `help:"Store a checksum in the syncfile."`
} `cmd:"" hidden:""`

Sync struct {
Expand Down Expand Up @@ -193,7 +194,7 @@ func main() {
logger.Fatalf("Failed to verify archive, %v", err)
}
case "makesync <input>":
err := pmtiles.Makesync(logger, cli.Makesync.Input, cli.Makesync.BlockSizeMegabytes)
err := pmtiles.Makesync(logger, version, cli.Makesync.Input, cli.Makesync.BlockSizeKb, cli.Makesync.Checksum)
if err != nil {
logger.Fatalf("Failed to makesync archive, %v", err)
}
Expand Down
118 changes: 64 additions & 54 deletions pmtiles/makesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"context"
"crypto/md5"
"fmt"
"github.com/dustin/go-humanize"
"github.com/schollz/progressbar/v3"
Expand All @@ -13,8 +14,10 @@ import (
"log"
"os"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
)

Expand All @@ -30,12 +33,18 @@ type Result struct {
Hash uint64
}

func Makesync(logger *log.Logger, file string, block_size_megabytes int) error {
type Syncline struct {
Offset uint64
Length uint64
Hash uint64
}

func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb int, checksum string) error {
ctx := context.Background()
start := time.Now()

bucketURL, key, err := NormalizeBucketKey("", "", file)
block_size_bytes := uint64(1024 * 1024 * block_size_megabytes)
block_size_bytes := uint64(1000 * block_size_kb)

if err != nil {
return err
Expand Down Expand Up @@ -93,6 +102,23 @@ func Makesync(logger *log.Logger, file string, block_size_megabytes int) error {
panic(err)
}
defer output.Close()
output.Write([]byte(fmt.Sprintf("version=%s\n", cli_version)))

if checksum == "md5" {
localfile, err := os.Open(file)
if err != nil {
panic(err)
}
defer localfile.Close()
reader := bufio.NewReaderSize(localfile, 64*1024*1024)
md5hasher := md5.New()
if _, err := io.Copy(md5hasher, reader); err != nil {
panic(err)
}
md5checksum := md5hasher.Sum(nil)
fmt.Printf("Completed md5 in %v.\n", time.Since(start))
output.Write([]byte(fmt.Sprintf("md5=%x\n", md5checksum)))
}

output.Write([]byte("hash=fnv1a\n"))
output.Write([]byte(fmt.Sprintf("blocksize=%d\n", block_size_bytes)))
Expand All @@ -104,63 +130,41 @@ func Makesync(logger *log.Logger, file string, block_size_megabytes int) error {

var current Block

GetHash := func(offset uint64, length uint64) uint64 {
hasher := fnv.New64a()
r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+offset), int64(length))
if err != nil {
log.Fatal(err)
}
tasks := make(chan Block, 1000)

if _, err := io.Copy(hasher, r); err != nil {
log.Fatal(err)
}
r.Close()
return hasher.Sum64()
}
var wg sync.WaitGroup
var mu sync.Mutex

tasks := make(chan Block, 10000)
intermediate := make(chan Result, 10000)
synclines := make(map[uint64]Syncline)

errs, _ := errgroup.WithContext(ctx)
// workers
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
errs.Go(func() error {
wg.Add(1)
hasher := fnv.New64a()
for block := range tasks {
intermediate <- Result{block, GetHash(block.Offset, block.Length)}
}
return nil
})
}

done := make(chan struct{})

go func() {
buffer := make(map[uint64]Result)
nextIndex := uint64(0)

for i := range intermediate {
buffer[i.Block.Index] = i

for {
if next, ok := buffer[nextIndex]; ok {

output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x\n", next.Block.Start, next.Block.Offset, next.Block.Length, next.Hash)))
r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+block.Offset), int64(block.Length))
if err != nil {
log.Fatal(err)
}

delete(buffer, nextIndex)
nextIndex++
if _, err := io.Copy(hasher, r); err != nil {
log.Fatal(err)
}
r.Close()

if next.Block.Offset+next.Block.Length == header.TileDataLength {
close(intermediate)
}
sum64 := hasher.Sum64()
mu.Lock()
synclines[block.Start] = Syncline{block.Offset, block.Length, sum64}
mu.Unlock()

} else {
break
}
hasher.Reset()
}
}

done <- struct{}{}
}()
wg.Done()
return nil
})
}

current_index := uint64(0)

Expand Down Expand Up @@ -196,18 +200,24 @@ func Makesync(logger *log.Logger, file string, block_size_megabytes int) error {
blocks += 1
close(tasks)

<-done
wg.Wait()

var keys []uint64
for k := range synclines {
keys = append(keys, k)
}
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })

for _, k := range keys {
syncline := synclines[k]
output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x\n", k, syncline.Offset, syncline.Length, syncline.Hash)))
}

fmt.Printf("Created syncfile with %d blocks.\n", blocks)
fmt.Printf("Completed makesync in %v.\n", time.Since(start))
return nil
}

type Syncline struct {
Offset uint64
Length uint64
Hash uint64
}

func Sync(logger *log.Logger, file string, syncfile string) error {
start := time.Now()
total_remote_bytes := uint64(0)
Expand Down