From eff0c263162ee30fffaec11bdc235dd3bfe49413 Mon Sep 17 00:00:00 2001 From: Brandon Liu Date: Fri, 19 Jan 2024 15:42:49 +0800 Subject: [PATCH] wip on syncfile --- main.go | 2 +- pmtiles/makesync.go | 211 ++++++++++++++++++++++---------------------- 2 files changed, 108 insertions(+), 105 deletions(-) diff --git a/main.go b/main.go index cca4084..52c135f 100644 --- a/main.go +++ b/main.go @@ -65,7 +65,7 @@ var cli struct { Makesync struct { Input string `arg:"" type:"existingfile"` - BlockSizeKb int `default:1000 help:"The approximate block size, in kilobytes. 0 means 1 tile = 1 block."` + BlockSizeKb int `default:20 help:"The approximate block size, in kilobytes. 0 means 1 tile = 1 block."` HashFunction string `default:fnv1a help:"The hash function."` Checksum string `help:"Store a checksum in the syncfile."` } `cmd:"" hidden:""` diff --git a/pmtiles/makesync.go b/pmtiles/makesync.go index 48a78f2..bb5bee2 100644 --- a/pmtiles/makesync.go +++ b/pmtiles/makesync.go @@ -1,11 +1,12 @@ package pmtiles import ( - "encoding/binary" "bufio" "bytes" "context" "crypto/md5" + "encoding/binary" + "encoding/json" "fmt" "github.com/dustin/go-humanize" "github.com/schollz/progressbar/v3" @@ -16,30 +17,70 @@ import ( "os" "runtime" "sort" - "strconv" - "strings" "sync" "time" ) -type Block struct { - Index uint64 // starts at 0 - Start uint64 // the start tileID - Offset uint64 // the offset in the file, in bytes - Length uint64 // the length, in bytes +type SyncBlock struct { + Start uint64 // the start tileID of the block + Offset uint64 // the offset in the source archive + Length uint64 // the length of the block + Hash uint64 // the hash of the block } -type Result struct { - Block Block - Hash uint64 +type SyncMetadata struct { + Version string + BlockSize uint64 + HashType string + HashSize uint8 + ChecksumType string + Checksum string + NumBlocks int } -type Syncline struct { - Offset uint64 - Length uint64 - Hash uint64 +func SerializeSyncBlocks(output io.Writer, blocks []SyncBlock) { + tmp := make([]byte, binary.MaxVarintLen64) + var n int + + lastStartId := uint64(0) + lastOffset := uint64(0) + for _, block := range blocks { + n = binary.PutUvarint(tmp, uint64(block.Start-lastStartId)) + output.Write(tmp[:n]) + n = binary.PutUvarint(tmp, uint64(block.Offset-lastOffset)) + output.Write(tmp[:n]) + n = binary.PutUvarint(tmp, uint64(block.Length)) + output.Write(tmp[:n]) + binary.LittleEndian.PutUint64(tmp, block.Hash) + output.Write(tmp[0:8]) + + lastStartId = block.Start + lastOffset = block.Offset + } } +func DeserializesyncBlocks(num_blocks int, reader *bufio.Reader) []SyncBlock { + blocks := make([]SyncBlock, 0) + + lastStartId := uint64(0) + lastOffset := uint64(0) + buf := make([]byte, 8) + + for i := 0; i < num_blocks; i++ { + start, _ := binary.ReadUvarint(reader) + offset, _ := binary.ReadUvarint(reader) + length, _ := binary.ReadUvarint(reader) + _, _ = io.ReadFull(reader, buf) + blocks = append(blocks, SyncBlock{Start: lastStartId + start, Offset: lastOffset + offset, Length: length, Hash: binary.LittleEndian.Uint64(buf)}) + + lastStartId = lastStartId + start + lastOffset = lastOffset + offset + } + + return blocks +} + +// measure the number of "missing blocks" func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb int, checksum string) error { ctx := context.Background() start := time.Now() @@ -103,7 +144,6 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb panic(err) } defer output.Close() - output.Write([]byte(fmt.Sprintf("version=%s\n", cli_version))) if checksum == "md5" { localfile, err := os.Open(file) @@ -118,25 +158,22 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb } md5checksum := md5hasher.Sum(nil) fmt.Printf("Completed md5 in %v.\n", time.Since(start)) - output.Write([]byte(fmt.Sprintf("md5=%x\n", md5checksum))) + fmt.Printf("md5=%x\n", md5checksum) } - output.Write([]byte("hash=fnv1a\n")) - output.Write([]byte(fmt.Sprintf("blocksize=%d\n", block_size_bytes))) - bar := progressbar.Default( int64(header.TileEntriesCount), "writing syncfile", ) - var current Block + var current SyncBlock - tasks := make(chan Block, 1000) + tasks := make(chan SyncBlock, 1000) var wg sync.WaitGroup var mu sync.Mutex - synclines := make(map[uint64]Syncline) + blocks := make([]SyncBlock, 0) errs, _ := errgroup.WithContext(ctx) // workers @@ -155,9 +192,10 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb } r.Close() - sum64 := hasher.Sum64() + block.Hash = hasher.Sum64() + mu.Lock() - synclines[block.Start] = Syncline{block.Offset, block.Length, sum64} + blocks = append(blocks, block) mu.Unlock() hasher.Reset() @@ -167,13 +205,9 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb }) } - current_index := uint64(0) - - blocks := 0 CollectEntries(header.RootOffset, header.RootLength, func(e EntryV3) { bar.Add(1) if current.Length == 0 { - current.Index = current_index current.Start = e.TileId current.Offset = e.Offset current.Length = uint64(e.Length) @@ -182,12 +216,10 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb } else if e.Offset > current.Offset+uint64(current.Length) { panic("Invalid clustering of archive detected - check with verify") } else { + // check this logic if current.Length+uint64(e.Length) > block_size_bytes { - tasks <- Block{current.Index, current.Start, current.Offset, current.Length} - blocks += 1 + tasks <- SyncBlock{current.Start, current.Offset, current.Length, 0} - current_index += 1 - current.Index = current_index current.Start = e.TileId current.Offset = e.Offset current.Length = uint64(e.Length) @@ -197,41 +229,27 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb } }) - tasks <- Block{current.Index, current.Start, current.Offset, current.Length} - blocks += 1 + tasks <- SyncBlock{current.Start, current.Offset, current.Length, 0} close(tasks) wg.Wait() - var keys []uint64 - for k := range synclines { - keys = append(keys, k) - } - sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) - - tmp := make([]byte, binary.MaxVarintLen64) - var n int + sort.Slice(blocks, func(i, j int) bool { return blocks[i].Start < blocks[j].Start }) - lastStartId := uint64(0) - lastOffset := uint64(0) - for _, k := range keys { - syncline := synclines[k] - n = binary.PutUvarint(tmp, uint64(k - lastStartId)) - output.Write(tmp[:n]) - n = binary.PutUvarint(tmp, uint64(syncline.Offset - lastOffset)) - output.Write(tmp[:n]) - n = binary.PutUvarint(tmp, uint64(syncline.Length)) - output.Write(tmp[:n]) - binary.LittleEndian.PutUint64(tmp, syncline.Hash) - output.Write(tmp[0:8]) + metadata_bytes, err := json.Marshal(SyncMetadata{ + Version: cli_version, + HashSize: 8, + BlockSize: block_size_bytes, + HashType: "fnv1a", + NumBlocks: len(blocks), + }) - lastStartId = k - lastOffset = syncline.Offset + output.Write(metadata_bytes) + output.Write([]byte{'\n'}) - // output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x\n", k, syncline.Offset, syncline.Length, syncline.Hash))) - } + SerializeSyncBlocks(output, blocks) - fmt.Printf("Created syncfile with %d blocks.\n", blocks) + fmt.Printf("Created syncfile with %d blocks.\n", len(blocks)) fmt.Printf("Completed makesync in %v.\n", time.Since(start)) return nil } @@ -240,30 +258,20 @@ func Sync(logger *log.Logger, file string, syncfile string) error { start := time.Now() total_remote_bytes := uint64(0) - by_start_id := make(map[uint64]Syncline) - sync, err := os.Open(syncfile) if err != nil { return fmt.Errorf("Error opening syncfile: %v\n", err) } defer sync.Close() - scanner := bufio.NewScanner(sync) - for scanner.Scan() { - line := scanner.Text() - parts := strings.Fields(line) - if len(parts) != 4 { - continue - } - start_id, _ := strconv.ParseUint(parts[0], 10, 64) - offset, _ := strconv.ParseUint(parts[1], 10, 64) - length, _ := strconv.ParseUint(parts[2], 10, 64) - total_remote_bytes += length - hash, _ := strconv.ParseUint(parts[3], 16, 64) - by_start_id[start_id] = Syncline{offset, length, hash} - } + buffered_reader := bufio.NewReader(sync) + + var metadata SyncMetadata + json_bytes, _ := buffered_reader.ReadSlice('\n') - // open the existing archive + json.Unmarshal(json_bytes, &metadata) + + blocks := DeserializesyncBlocks(metadata.NumBlocks, buffered_reader) ctx := context.Background() @@ -297,19 +305,19 @@ func Sync(logger *log.Logger, file string, syncfile string) error { return fmt.Errorf("Error: archive must be clustered for makesync.") } - GetHash := func(offset uint64, length uint64) uint64 { - hasher := fnv.New64a() - r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+offset), int64(length)) - if err != nil { - log.Fatal(err) - } + // GetHash := func(offset uint64, length uint64) uint64 { + // hasher := fnv.New64a() + // r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+offset), int64(length)) + // if err != nil { + // log.Fatal(err) + // } - if _, err := io.Copy(hasher, r); err != nil { - log.Fatal(err) - } - r.Close() - return hasher.Sum64() - } + // if _, err := io.Copy(hasher, r); err != nil { + // log.Fatal(err) + // } + // r.Close() + // return hasher.Sum64() + // } var CollectEntries func(uint64, uint64, func(EntryV3)) @@ -339,31 +347,26 @@ func Sync(logger *log.Logger, file string, syncfile string) error { "calculating diff", ) - total_blocks := len(by_start_id) hits := 0 CollectEntries(header.RootOffset, header.RootLength, func(e EntryV3) { bar.Add(1) - - potential_match, ok := by_start_id[e.TileId] - if ok { - hash_result := GetHash(e.Offset, potential_match.Length) - if hash_result == potential_match.Hash { - hits += 1 - delete(by_start_id, e.TileId) - } - } +// hash_result := GetHash(e.Offset, potential_match.Length) +// if hash_result == potential_match.Hash { +// hits += 1 +// delete(by_start_id, e.TileId) +// } }) to_transfer := uint64(0) - for _, v := range by_start_id { - to_transfer += v.Length - } + // for _, v := range by_start_id { + // to_transfer += v.Length + // } - blocks_matched := float64(hits) / float64(total_blocks) * 100 + blocks_matched := float64(hits) / float64(len(blocks)) * 100 pct := float64(to_transfer) / float64(total_remote_bytes) * 100 - fmt.Printf("%d/%d blocks matched (%.1f%%), need to transfer %s/%s (%.1f%%).\n", hits, total_blocks, blocks_matched, humanize.Bytes(to_transfer), humanize.Bytes(total_remote_bytes), pct) + fmt.Printf("%d/%d blocks matched (%.1f%%), need to transfer %s/%s (%.1f%%).\n", hits, len(blocks), blocks_matched, humanize.Bytes(to_transfer), humanize.Bytes(total_remote_bytes), pct) fmt.Printf("Completed sync in %v.\n", time.Since(start)) return nil