Skip to content

Commit

Permalink
wip on syncfile
Browse files Browse the repository at this point in the history
  • Loading branch information
bdon committed Jan 19, 2024
1 parent 2fc1763 commit eff0c26
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 105 deletions.
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ var cli struct {

Makesync struct {
Input string `arg:"" type:"existingfile"`
BlockSizeKb int `default:1000 help:"The approximate block size, in kilobytes. 0 means 1 tile = 1 block."`
BlockSizeKb int `default:20 help:"The approximate block size, in kilobytes. 0 means 1 tile = 1 block."`
HashFunction string `default:fnv1a help:"The hash function."`
Checksum string `help:"Store a checksum in the syncfile."`
} `cmd:"" hidden:""`
Expand Down
211 changes: 107 additions & 104 deletions pmtiles/makesync.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package pmtiles

import (
"encoding/binary"
"bufio"
"bytes"
"context"
"crypto/md5"
"encoding/binary"
"encoding/json"
"fmt"
"github.com/dustin/go-humanize"
"github.com/schollz/progressbar/v3"
Expand All @@ -16,30 +17,70 @@ import (
"os"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
)

type Block struct {
Index uint64 // starts at 0
Start uint64 // the start tileID
Offset uint64 // the offset in the file, in bytes
Length uint64 // the length, in bytes
type SyncBlock struct {
Start uint64 // the start tileID of the block
Offset uint64 // the offset in the source archive
Length uint64 // the length of the block
Hash uint64 // the hash of the block
}

type Result struct {
Block Block
Hash uint64
type SyncMetadata struct {
Version string
BlockSize uint64
HashType string
HashSize uint8
ChecksumType string
Checksum string
NumBlocks int
}

type Syncline struct {
Offset uint64
Length uint64
Hash uint64
func SerializeSyncBlocks(output io.Writer, blocks []SyncBlock) {
tmp := make([]byte, binary.MaxVarintLen64)
var n int

lastStartId := uint64(0)
lastOffset := uint64(0)
for _, block := range blocks {
n = binary.PutUvarint(tmp, uint64(block.Start-lastStartId))
output.Write(tmp[:n])
n = binary.PutUvarint(tmp, uint64(block.Offset-lastOffset))
output.Write(tmp[:n])
n = binary.PutUvarint(tmp, uint64(block.Length))
output.Write(tmp[:n])
binary.LittleEndian.PutUint64(tmp, block.Hash)
output.Write(tmp[0:8])

lastStartId = block.Start
lastOffset = block.Offset
}
}

func DeserializesyncBlocks(num_blocks int, reader *bufio.Reader) []SyncBlock {
blocks := make([]SyncBlock, 0)

lastStartId := uint64(0)
lastOffset := uint64(0)
buf := make([]byte, 8)

for i := 0; i < num_blocks; i++ {
start, _ := binary.ReadUvarint(reader)
offset, _ := binary.ReadUvarint(reader)
length, _ := binary.ReadUvarint(reader)
_, _ = io.ReadFull(reader, buf)
blocks = append(blocks, SyncBlock{Start: lastStartId + start, Offset: lastOffset + offset, Length: length, Hash: binary.LittleEndian.Uint64(buf)})

lastStartId = lastStartId + start
lastOffset = lastOffset + offset
}

return blocks
}

// measure the number of "missing blocks"
func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb int, checksum string) error {
ctx := context.Background()
start := time.Now()
Expand Down Expand Up @@ -103,7 +144,6 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb
panic(err)
}
defer output.Close()
output.Write([]byte(fmt.Sprintf("version=%s\n", cli_version)))

if checksum == "md5" {
localfile, err := os.Open(file)
Expand All @@ -118,25 +158,22 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb
}
md5checksum := md5hasher.Sum(nil)
fmt.Printf("Completed md5 in %v.\n", time.Since(start))
output.Write([]byte(fmt.Sprintf("md5=%x\n", md5checksum)))
fmt.Printf("md5=%x\n", md5checksum)
}

output.Write([]byte("hash=fnv1a\n"))
output.Write([]byte(fmt.Sprintf("blocksize=%d\n", block_size_bytes)))

bar := progressbar.Default(
int64(header.TileEntriesCount),
"writing syncfile",
)

var current Block
var current SyncBlock

tasks := make(chan Block, 1000)
tasks := make(chan SyncBlock, 1000)

var wg sync.WaitGroup
var mu sync.Mutex

synclines := make(map[uint64]Syncline)
blocks := make([]SyncBlock, 0)

errs, _ := errgroup.WithContext(ctx)
// workers
Expand All @@ -155,9 +192,10 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb
}
r.Close()

sum64 := hasher.Sum64()
block.Hash = hasher.Sum64()

mu.Lock()
synclines[block.Start] = Syncline{block.Offset, block.Length, sum64}
blocks = append(blocks, block)
mu.Unlock()

hasher.Reset()
Expand All @@ -167,13 +205,9 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb
})
}

current_index := uint64(0)

blocks := 0
CollectEntries(header.RootOffset, header.RootLength, func(e EntryV3) {
bar.Add(1)
if current.Length == 0 {
current.Index = current_index
current.Start = e.TileId
current.Offset = e.Offset
current.Length = uint64(e.Length)
Expand All @@ -182,12 +216,10 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb
} else if e.Offset > current.Offset+uint64(current.Length) {
panic("Invalid clustering of archive detected - check with verify")
} else {
// check this logic
if current.Length+uint64(e.Length) > block_size_bytes {
tasks <- Block{current.Index, current.Start, current.Offset, current.Length}
blocks += 1
tasks <- SyncBlock{current.Start, current.Offset, current.Length, 0}

current_index += 1
current.Index = current_index
current.Start = e.TileId
current.Offset = e.Offset
current.Length = uint64(e.Length)
Expand All @@ -197,41 +229,27 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb
}
})

tasks <- Block{current.Index, current.Start, current.Offset, current.Length}
blocks += 1
tasks <- SyncBlock{current.Start, current.Offset, current.Length, 0}
close(tasks)

wg.Wait()

var keys []uint64
for k := range synclines {
keys = append(keys, k)
}
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })

tmp := make([]byte, binary.MaxVarintLen64)
var n int
sort.Slice(blocks, func(i, j int) bool { return blocks[i].Start < blocks[j].Start })

lastStartId := uint64(0)
lastOffset := uint64(0)
for _, k := range keys {
syncline := synclines[k]
n = binary.PutUvarint(tmp, uint64(k - lastStartId))
output.Write(tmp[:n])
n = binary.PutUvarint(tmp, uint64(syncline.Offset - lastOffset))
output.Write(tmp[:n])
n = binary.PutUvarint(tmp, uint64(syncline.Length))
output.Write(tmp[:n])
binary.LittleEndian.PutUint64(tmp, syncline.Hash)
output.Write(tmp[0:8])
metadata_bytes, err := json.Marshal(SyncMetadata{
Version: cli_version,
HashSize: 8,
BlockSize: block_size_bytes,
HashType: "fnv1a",
NumBlocks: len(blocks),
})

lastStartId = k
lastOffset = syncline.Offset
output.Write(metadata_bytes)
output.Write([]byte{'\n'})

// output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x\n", k, syncline.Offset, syncline.Length, syncline.Hash)))
}
SerializeSyncBlocks(output, blocks)

fmt.Printf("Created syncfile with %d blocks.\n", blocks)
fmt.Printf("Created syncfile with %d blocks.\n", len(blocks))
fmt.Printf("Completed makesync in %v.\n", time.Since(start))
return nil
}
Expand All @@ -240,30 +258,20 @@ func Sync(logger *log.Logger, file string, syncfile string) error {
start := time.Now()
total_remote_bytes := uint64(0)

by_start_id := make(map[uint64]Syncline)

sync, err := os.Open(syncfile)
if err != nil {
return fmt.Errorf("Error opening syncfile: %v\n", err)
}
defer sync.Close()
scanner := bufio.NewScanner(sync)
for scanner.Scan() {
line := scanner.Text()
parts := strings.Fields(line)
if len(parts) != 4 {
continue
}

start_id, _ := strconv.ParseUint(parts[0], 10, 64)
offset, _ := strconv.ParseUint(parts[1], 10, 64)
length, _ := strconv.ParseUint(parts[2], 10, 64)
total_remote_bytes += length
hash, _ := strconv.ParseUint(parts[3], 16, 64)
by_start_id[start_id] = Syncline{offset, length, hash}
}
buffered_reader := bufio.NewReader(sync)

var metadata SyncMetadata
json_bytes, _ := buffered_reader.ReadSlice('\n')

// open the existing archive
json.Unmarshal(json_bytes, &metadata)

blocks := DeserializesyncBlocks(metadata.NumBlocks, buffered_reader)

ctx := context.Background()

Expand Down Expand Up @@ -297,19 +305,19 @@ func Sync(logger *log.Logger, file string, syncfile string) error {
return fmt.Errorf("Error: archive must be clustered for makesync.")
}

GetHash := func(offset uint64, length uint64) uint64 {
hasher := fnv.New64a()
r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+offset), int64(length))
if err != nil {
log.Fatal(err)
}
// GetHash := func(offset uint64, length uint64) uint64 {
// hasher := fnv.New64a()
// r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+offset), int64(length))
// if err != nil {
// log.Fatal(err)
// }

if _, err := io.Copy(hasher, r); err != nil {
log.Fatal(err)
}
r.Close()
return hasher.Sum64()
}
// if _, err := io.Copy(hasher, r); err != nil {
// log.Fatal(err)
// }
// r.Close()
// return hasher.Sum64()
// }

var CollectEntries func(uint64, uint64, func(EntryV3))

Expand Down Expand Up @@ -339,31 +347,26 @@ func Sync(logger *log.Logger, file string, syncfile string) error {
"calculating diff",
)

total_blocks := len(by_start_id)
hits := 0

CollectEntries(header.RootOffset, header.RootLength, func(e EntryV3) {
bar.Add(1)

potential_match, ok := by_start_id[e.TileId]
if ok {
hash_result := GetHash(e.Offset, potential_match.Length)
if hash_result == potential_match.Hash {
hits += 1
delete(by_start_id, e.TileId)
}
}
// hash_result := GetHash(e.Offset, potential_match.Length)
// if hash_result == potential_match.Hash {
// hits += 1
// delete(by_start_id, e.TileId)
// }
})

to_transfer := uint64(0)
for _, v := range by_start_id {
to_transfer += v.Length
}
// for _, v := range by_start_id {
// to_transfer += v.Length
// }

blocks_matched := float64(hits) / float64(total_blocks) * 100
blocks_matched := float64(hits) / float64(len(blocks)) * 100
pct := float64(to_transfer) / float64(total_remote_bytes) * 100

fmt.Printf("%d/%d blocks matched (%.1f%%), need to transfer %s/%s (%.1f%%).\n", hits, total_blocks, blocks_matched, humanize.Bytes(to_transfer), humanize.Bytes(total_remote_bytes), pct)
fmt.Printf("%d/%d blocks matched (%.1f%%), need to transfer %s/%s (%.1f%%).\n", hits, len(blocks), blocks_matched, humanize.Bytes(to_transfer), humanize.Bytes(total_remote_bytes), pct)

fmt.Printf("Completed sync in %v.\n", time.Since(start))
return nil
Expand Down

0 comments on commit eff0c26

Please sign in to comment.