Skip to content

Commit

Permalink
store sync lines in memory; initialize one hasher per goroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
bdon committed Jan 6, 2024
1 parent 87ba271 commit fdab3ef
Showing 1 changed file with 29 additions and 22 deletions.
51 changes: 29 additions & 22 deletions pmtiles/makesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ type Result struct {
Hash uint64
}

type Syncline struct {
Offset uint64
Length uint64
Hash uint64
}

func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb int, checksum string) error {
ctx := context.Background()
start := time.Now()
Expand Down Expand Up @@ -122,36 +128,36 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb

var current Block

GetHash := func(offset uint64, length uint64) uint64 {
hasher := fnv.New64a()
r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+offset), int64(length))
if err != nil {
log.Fatal(err)
}

if _, err := io.Copy(hasher, r); err != nil {
log.Fatal(err)
}
r.Close()
return hasher.Sum64()
}

tasks := make(chan Block, 10000)
intermediate := make(chan Result, 10000)

errs, _ := errgroup.WithContext(ctx)
// workers
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
errs.Go(func() error {
hasher := fnv.New64a()
for block := range tasks {
intermediate <- Result{block, GetHash(block.Offset, block.Length)}
r, err := bucket.NewRangeReader(ctx, key, int64(header.TileDataOffset+block.Offset), int64(block.Length))
if err != nil {
log.Fatal(err)
}

if _, err := io.Copy(hasher, r); err != nil {
log.Fatal(err)
}
r.Close()

intermediate <- Result{block, hasher.Sum64()}
hasher.Reset()
}
return nil
})
}

done := make(chan struct{})

synclines := make([][]uint64, 0)

go func() {
buffer := make(map[uint64]Result)
nextIndex := uint64(0)
Expand All @@ -162,7 +168,9 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb
for {
if next, ok := buffer[nextIndex]; ok {

output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x\n", next.Block.Start, next.Block.Offset, next.Block.Length, next.Hash)))
// output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x\n", next.Block.Start, next.Block.Offset, next.Block.Length, next.Hash)))

synclines = append(synclines, []uint64{next.Block.Start, next.Block.Offset, next.Block.Length, next.Hash})

delete(buffer, nextIndex)
nextIndex++
Expand Down Expand Up @@ -215,17 +223,16 @@ func Makesync(logger *log.Logger, cli_version string, file string, block_size_kb
close(tasks)

<-done

for _, s := range synclines {
output.Write([]byte(fmt.Sprintf("%d\t%d\t%d\t%x\n", s[0], s[1], s[2], s[3])))
}

fmt.Printf("Created syncfile with %d blocks.\n", blocks)
fmt.Printf("Completed makesync in %v.\n", time.Since(start))
return nil
}

type Syncline struct {
Offset uint64
Length uint64
Hash uint64
}

func Sync(logger *log.Logger, file string, syncfile string) error {
start := time.Now()
total_remote_bytes := uint64(0)
Expand Down

0 comments on commit fdab3ef

Please sign in to comment.