From 9bc8c2247b02ef52168ec161f98d37444a7d9d42 Mon Sep 17 00:00:00 2001 From: Brandon Liu Date: Mon, 4 Sep 2023 23:25:00 +0800 Subject: [PATCH] implement pmtiles extract [#31, #52] (#62) * implement pmtiles extract [#31, #52] * Experimental cli support for extracting a region from a larger archive, given a maxzoom and GeoJSON multipolygon region. * Limited to credentialed buckets or local files now, public HTTP to come later * Limited to a single download thread * Change directory optimization to be faster and match Java implementation, affects root/leaf sizes * Finish initial extract [#31] * include the DstOffset so we can multithread downloads later * set header statistics * implement --dry-run * add logging messages for user feedback --- main.go | 20 +- pmtiles/convert.go | 4 +- pmtiles/directory.go | 43 ++-- pmtiles/downloader.go | 2 +- pmtiles/extract.go | 558 +++++++++++++++++++++++++++++++++------- pmtiles/extract_test.go | 167 ++++++++++-- pmtiles/readerv2.go | 4 +- 7 files changed, 661 insertions(+), 137 deletions(-) diff --git a/main.go b/main.go index 8df2790..c6f4a22 100644 --- a/main.go +++ b/main.go @@ -45,12 +45,13 @@ var cli struct { } `cmd:"" help:"Fetch one tile from a local or remote archive and output on stdout."` Extract struct { - Input string `arg:"" help:"Input local or remote archive."` - Output string `arg:"" help:"Output archive." type:"path"` - Bucket string `help:"Remote bucket of input archive."` - Region string `help:"local GeoJSON Polygon or MultiPolygon file for area of interest." type:"existingfile"` - Maxzoom int `help:"Maximum zoom level, inclusive."` - DryRun bool `help:"Calculate tiles to extract based on header and directories, but don't download them."` + Input string `arg:"" help:"Input local or remote archive."` + Output string `arg:"" help:"Output archive." type:"path"` + Bucket string `help:"Remote bucket of input archive."` + Region string `help:"local GeoJSON Polygon or MultiPolygon file for area of interest." type:"existingfile"` + Maxzoom uint8 `help:"Maximum zoom level, inclusive."` + DryRun bool `help:"Calculate tiles to extract, but don't download them."` + Overfetch float32 `default:0.1 help:"What ratio of extra data to download to minimize # requests; 0.2 is 20%"` } `cmd:"" help:"Create an archive from a larger archive for a subset of zoom levels or geographic region."` Verify struct { @@ -119,7 +120,10 @@ func main() { logger.Printf("Serving %s %s on port %d with Access-Control-Allow-Origin: %s\n", cli.Serve.Bucket, cli.Serve.Path, cli.Serve.Port, cli.Serve.Cors) logger.Fatal(http.ListenAndServe(":"+strconv.Itoa(cli.Serve.Port), nil)) case "extract ": - logger.Fatalf("This command is not yet implemented.") + err := pmtiles.Extract(logger, cli.Extract.Bucket, cli.Extract.Input, cli.Extract.Maxzoom, cli.Extract.Region, cli.Extract.Output, cli.Extract.Overfetch, cli.Extract.DryRun) + if err != nil { + logger.Fatalf("Failed to extract, %v", err) + } case "convert ": path := cli.Convert.Input output := cli.Convert.Output @@ -160,6 +164,8 @@ func main() { logger.Fatalf("Failed to upload file, %v", err) } case "verify ": + // check clustered + // check counts (addressed tiles, tile entries, # tile contents) logger.Fatalf("This command is not yet implemented.") case "version": fmt.Printf("pmtiles %s, commit %s, built at %s\n", version, commit, date) diff --git a/pmtiles/convert.go b/pmtiles/convert.go index 9036e56..4abc492 100644 --- a/pmtiles/convert.go +++ b/pmtiles/convert.go @@ -114,14 +114,14 @@ func Convert(logger *log.Logger, input string, output string, deduplicate bool, func add_directoryv2_entries(dir DirectoryV2, entries *[]EntryV3, f *os.File) { for zxy, rng := range dir.Entries { tile_id := ZxyToId(zxy.Z, zxy.X, zxy.Y) - *entries = append(*entries, EntryV3{tile_id, rng.Offset, rng.Length, 1}) + *entries = append(*entries, EntryV3{tile_id, rng.Offset, uint32(rng.Length), 1}) } var unique = map[uint64]uint32{} // uniqify the offset/length pairs for _, rng := range dir.Leaves { - unique[rng.Offset] = rng.Length + unique[rng.Offset] = uint32(rng.Length) } for offset, length := range unique { diff --git a/pmtiles/directory.go b/pmtiles/directory.go index 3326f71..e40c529 100644 --- a/pmtiles/directory.go +++ b/pmtiles/directory.go @@ -313,24 +313,31 @@ func build_roots_leaves(entries []EntryV3, leaf_size int) ([]byte, []byte, int) } func optimize_directories(entries []EntryV3, target_root_len int) ([]byte, []byte, int) { - test_root_bytes := serialize_entries(entries) - - // Case1: the entire directory fits into the target len - if len(test_root_bytes) <= target_root_len { - return test_root_bytes, make([]byte, 0), 0 - } else { - - // TODO: case 2: mixed tile entries/directory entries in root - - // case 3: root directory is leaf pointers only - // use an iterative method, increasing the size of the leaf directory until the root fits - leaf_size := 4096 - for { - root_bytes, leaves_bytes, num_leaves := build_roots_leaves(entries, leaf_size) - if len(root_bytes) <= target_root_len { - return root_bytes, leaves_bytes, num_leaves - } - leaf_size *= 2 + if len(entries) < 16384 { + test_root_bytes := serialize_entries(entries) + // Case1: the entire directory fits into the target len + if len(test_root_bytes) <= target_root_len { + return test_root_bytes, make([]byte, 0), 0 } } + + // TODO: case 2: mixed tile entries/directory entries in root + + // case 3: root directory is leaf pointers only + // use an iterative method, increasing the size of the leaf directory until the root fits + + var leaf_size float32 + leaf_size = float32(len(entries)) / 3500 + + if leaf_size < 4096 { + leaf_size = 4096 + } + + for { + root_bytes, leaves_bytes, num_leaves := build_roots_leaves(entries, int(leaf_size)) + if len(root_bytes) <= target_root_len { + return root_bytes, leaves_bytes, num_leaves + } + leaf_size *= 1.2 + } } diff --git a/pmtiles/downloader.go b/pmtiles/downloader.go index 873737e..b5db058 100644 --- a/pmtiles/downloader.go +++ b/pmtiles/downloader.go @@ -72,7 +72,7 @@ func DownloadParts(getter func (Range) []byte, ranges []Range, numThreads int) c // an number for overhead: 0.2 is 20% overhead, 1.0 is 100% overhead // a number of maximum chunk size: n chunks * threads is the max memory usage -// store the smallest gaps in a heap +// store the smallest gaps in a heap; merge ranges until overhead budget is reached func DownloadBatchedParts(getter func (Range) []byte, ranges []Range, overhead float32, maxSizeBytes int, numThreads int) chan []byte { orderedOutput := make(chan []byte, 8) return orderedOutput diff --git a/pmtiles/extract.go b/pmtiles/extract.go index 55ea756..745c66e 100644 --- a/pmtiles/extract.go +++ b/pmtiles/extract.go @@ -1,121 +1,499 @@ package pmtiles import ( - "encoding/json" + "bytes" + "context" + "fmt" + "github.com/RoaringBitmap/roaring/roaring64" + "github.com/dustin/go-humanize" + "github.com/paulmach/orb" + "github.com/paulmach/orb/geojson" + "github.com/schollz/progressbar/v3" + "gocloud.dev/blob" "io" + "io/ioutil" "log" "math" "os" - "strconv" + "sort" + "strings" + "time" ) -type Metadata struct { - Format string `json:"format"` - Minzoom string `json:"minzoom"` - Maxzoom string `json:"maxzoom"` - Bounds string `json:"bounds"` - Compress string `json:"compress"` - Attribution string `json:"attribution"` +type SrcDstRange struct { + SrcOffset uint64 + DstOffset uint64 + Length uint64 } -func PointToTile(z int, lng float64, lat float64) Zxy { - d2r := math.Pi / 180 - sin := math.Sin(lat * d2r) - z2 := 1 << z - x := float64(z2) * (lng/360 + 0.5) - y := float64(z2) * (0.5 - 0.25*math.Log((1+sin)/(1-sin))/math.Pi) - x = math.Mod(x, float64(z2)) - if x < 0 { - x = x + float64(z2) - } - iy := int(math.Floor(y)) - if iy > z2-1 { - iy = z2 - 1 - } - return Zxy{Z: uint8(z), X: uint32(math.Floor(x)), Y: uint32(iy)} +// given a bitmap and a set of existing entries, +// create only relevant entries +// return sorted slice of entries, and slice of all leaf entries +// any runlengths > 1 will be "trimmed" to the relevance bitmap +func RelevantEntries(bitmap *roaring64.Bitmap, maxzoom uint8, dir []EntryV3) ([]EntryV3, []EntryV3) { + last_tile := ZxyToId(maxzoom+1, 0, 0) + leaves := make([]EntryV3, 0) + tiles := make([]EntryV3, 0) + for idx, entry := range dir { + if entry.RunLength == 0 { + tmp := roaring64.New() + + // if this is the last thing in the directory, it needs to be bounded + if idx == len(dir)-1 { + tmp.AddRange(entry.TileId, last_tile) + } else { + tmp.AddRange(entry.TileId, dir[idx+1].TileId) + } + + if bitmap.Intersects(tmp) { + leaves = append(leaves, entry) + } + } else if entry.RunLength == 1 { + if bitmap.Contains(entry.TileId) { + tiles = append(tiles, entry) + } + } else { + // runlength > 1 + current_id := entry.TileId + current_runlength := uint32(0) + for y := entry.TileId; y < entry.TileId+uint64(entry.RunLength); y++ { + if bitmap.Contains(y) { + if current_runlength == 0 { + current_runlength = 1 + current_id = y + } else { + current_runlength += 1 + } + } else { + if current_runlength > 0 { + tiles = append(tiles, EntryV3{current_id, entry.Offset, entry.Length, current_runlength}) + } + current_runlength = 0 + } + } + if current_runlength > 0 { + tiles = append(tiles, EntryV3{current_id, entry.Offset, entry.Length, current_runlength}) + } + } + } + return tiles, leaves } -// given a source PMTiles archive, -// extract a subpyramid with the following parameters -// ZOOM_LEVEL, min_x, min_y, max_x, max_y -// Todo: add or replace the MASK layer with one feature - -func Matches(z uint8, minX uint32, minY uint32, maxX uint32, maxY uint32, candidate Zxy) bool { - if candidate.Z < z { - levels := z - candidate.Z - candidateMinXOnLevel := candidate.X << levels - candidateMinYOnLevel := candidate.Y << levels - candidateMaxXOnLevel := ((candidate.X + 1) << levels) - 1 - candidateMaxYOnLevel := ((candidate.Y + 1) << levels) - 1 - if candidateMaxXOnLevel < minX || candidateMaxYOnLevel < minY || candidateMinXOnLevel > maxX || candidateMinYOnLevel > maxY { - return false - } - return true - } else if candidate.Z == z { - return candidate.X >= minX && candidate.Y >= minY && candidate.X <= maxX && candidate.Y <= maxY - } else { - levels := candidate.Z - z - candidateXOnLevel := candidate.X >> levels - candidateYOnLevel := candidate.Y >> levels - return candidateXOnLevel >= minX && candidateYOnLevel >= minY && candidateXOnLevel <= maxX && candidateYOnLevel <= maxY +// Given a tile entries for a Source archive, sorted in TileID order, +// return: +// * Re-encoded tile-entries, with their offsets changed to contiguous (clustered) order in a new archive. +// * SrcDstRange: slice of offsets in the source archive, offset in the new archive, and length. +// - Each range is one or more tiles +// - the output must not have contiguous entries +// - It is sorted by new offsets, but not necessarily by source offsets +// +// * The total size of the tile section in the new archive +// * The # of addressed tiles (sum over RunLength) +// * # the number of unique offsets ("tile contents") +// - this might not be the last SrcDstRange new_offset + length, it's the highest offset (can be in the middle) +func ReencodeEntries(dir []EntryV3) ([]EntryV3, []SrcDstRange, uint64, uint64, uint64) { + reencoded := make([]EntryV3, 0, len(dir)) + seen_offsets := make(map[uint64]uint64) + ranges := make([]SrcDstRange, 0) + addressed_tiles := uint64(0) + + dst_offset := uint64(0) + for _, entry := range dir { + if val, ok := seen_offsets[entry.Offset]; ok { + reencoded = append(reencoded, EntryV3{entry.TileId, val, entry.Length, entry.RunLength}) + } else { + if len(ranges) > 0 { + last_range := ranges[len(ranges)-1] + if last_range.SrcOffset+last_range.Length == entry.Offset { + ranges[len(ranges)-1].Length += uint64(entry.Length) + } else { + ranges = append(ranges, SrcDstRange{entry.Offset, dst_offset, uint64(entry.Length)}) + } + } else { + ranges = append(ranges, SrcDstRange{entry.Offset, dst_offset, uint64(entry.Length)}) + } + + reencoded = append(reencoded, EntryV3{entry.TileId, dst_offset, entry.Length, entry.RunLength}) + seen_offsets[entry.Offset] = dst_offset + dst_offset += uint64(entry.Length) + } + + addressed_tiles += uint64(entry.RunLength) } + return reencoded, ranges, dst_offset, addressed_tiles, uint64(len(seen_offsets)) +} + +// "want the next N bytes, then discard N bytes" +type CopyDiscard struct { + Wanted uint64 + Discard uint64 +} + +type OverfetchRange struct { + Rng SrcDstRange + CopyDiscards []CopyDiscard } -func SubpyramidXY(logger *log.Logger, input string, output string, z uint8, minX uint32, minY uint32, maxX uint32, maxY uint32, bounds string) { - f, err := os.Open(input) +// A single request, where only some of the bytes +// in the requested range we want +type OverfetchListItem struct { + Rng SrcDstRange + CopyDiscards []CopyDiscard + BytesToNext uint64 // the "priority" + prev *OverfetchListItem + next *OverfetchListItem + index int +} + +// MergeRanges takes a slice of SrcDstRanges, that: +// * is non-contiguous, and is sorted by NewOffset +// * an Overfetch parameter +// - overfetch = 0.2 means we can request an extra 20% +// - overfetch = 1.00 means we can double our total transfer size +// +// Return a slice of OverfetchRanges +// +// Each OverfetchRange is one or more input ranges +// input ranges are merged in order of smallest byte distance to next range +// until the overfetch budget is consumed. +// The slice is sorted by Length +func MergeRanges(ranges []SrcDstRange, overfetch float32) []OverfetchRange { + total_size := 0 + + list := make([]*OverfetchListItem, len(ranges)) + + // create the heap items + for i, rng := range ranges { + var bytes_to_next uint64 + if i == len(ranges)-1 { + bytes_to_next = math.MaxUint64 + } else { + bytes_to_next = ranges[i+1].SrcOffset - (rng.SrcOffset + rng.Length) + if bytes_to_next < 0 { + bytes_to_next = math.MaxUint64 + } + } + + list[i] = &OverfetchListItem{ + Rng: rng, + BytesToNext: bytes_to_next, + CopyDiscards: []CopyDiscard{{uint64(rng.Length), 0}}, + } + total_size += int(rng.Length) + } + + // make the list doubly-linked + for i, item := range list { + if i > 0 { + item.prev = list[i-1] + } + if i < len(list)-1 { + item.next = list[i+1] + } + } + + overfetch_budget := int(float32(total_size) * overfetch) + + // create a 2nd slice, sorted by ascending distance to next range + shortest := make([]*OverfetchListItem, len(list)) + copy(shortest, list) + + sort.Slice(shortest, func(i, j int) bool { + return shortest[i].BytesToNext < shortest[j].BytesToNext + }) + + // while we haven't consumed the budget, merge ranges + for (len(shortest) > 1) && (overfetch_budget-int(shortest[0].BytesToNext) >= 0) { + item := shortest[0] + + // merge this item into item.next + new_length := item.Rng.Length + item.BytesToNext + item.next.Rng.Length + item.next.Rng = SrcDstRange{item.Rng.SrcOffset, item.Rng.DstOffset, new_length} + item.next.prev = item.prev + if item.prev != nil { + item.prev.next = item.next + } + item.CopyDiscards[len(item.CopyDiscards)-1].Discard = item.BytesToNext + item.next.CopyDiscards = append(item.CopyDiscards, item.next.CopyDiscards...) + + shortest = shortest[1:] + + overfetch_budget -= int(item.BytesToNext) + } + + // copy out the result structs + result := make([]OverfetchRange, len(shortest)) + + sort.Slice(shortest, func(i, j int) bool { + return shortest[i].Rng.DstOffset < shortest[j].Rng.DstOffset + }) + + for i, x := range shortest { + result[i] = OverfetchRange{ + Rng: x.Rng, + CopyDiscards: x.CopyDiscards, + } + } + + return result +} + +// 1. Get the root directory (check that it is clustered) +// 2. Turn the input geometry into a relevance bitmap (using min(maxzoom, headermaxzoom)) +// 3. Get all relevant level 1 directories (if any) +// 4. Get all relevant level 2 directories (usually none) +// 5. With the existing directory + relevance bitmap, construct +// * a new total directory (root + leaf directories) +// * a sorted slice of byte ranges in the old file required +// 6. Merge requested ranges using an overfetch parametter +// 7. write the modified header +// 8. write the root directory. +// 9. get and write the metadata. +// 10. write the leaf directories (if any) +// 11. Get all tiles, and write directly to the output. + +func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, region_file string, output string, overfetch float32, dry_run bool) error { + // 1. fetch the header + + if bucketURL == "" { + if strings.HasPrefix(file, "/") { + bucketURL = "file:///" + } else { + bucketURL = "file://" + } + } + + fmt.Println("WARNING: extract is an experimental feature and results may not be suitable for production use.") + start := time.Now() + + ctx := context.Background() + bucket, err := blob.OpenBucket(ctx, bucketURL) if err != nil { - return + return fmt.Errorf("Failed to open bucket for %s, %w", bucketURL, err) } - metadata_bytes, root_directory := ParseHeaderV2(f) + defer bucket.Close() - var metadata Metadata - json.Unmarshal(metadata_bytes, &metadata) - metadata.Maxzoom = strconv.Itoa(int(z)) - metadata.Bounds = bounds + r, err := bucket.NewRangeReader(ctx, file, 0, HEADERV3_LEN_BYTES, nil) - // writer := NewWriter(output) + if err != nil { + return fmt.Errorf("Failed to create range reader for %s, %w", file, err) + } + b, err := io.ReadAll(r) + if err != nil { + return err + } + r.Close() - if z >= root_directory.LeafZ { - for key, rng := range root_directory.Leaves { - if Matches(z, minX, minY, maxX, maxY, key) { - _, err = f.Seek(int64(rng.Offset), 0) - if err != nil { - panic("I/O error") - } + header, err := deserialize_header(b[0:HEADERV3_LEN_BYTES]) - dir_bytes := make([]byte, rng.Length) - io.ReadFull(f, dir_bytes) - - for i := 0; i < len(dir_bytes)/17; i++ { - leaf_z, lzxy, lrng := ParseEntryV2(dir_bytes[i*17 : i*17+17]) - if leaf_z == 0 { - if lzxy.Z <= z && Matches(z, minX, minY, maxX, maxY, lzxy) { - _, err = f.Seek(int64(lrng.Offset), 0) - if err != nil { - return - } - tile_data := make([]byte, lrng.Length) - io.ReadFull(f, tile_data) - // writer.WriteTile(lzxy, tile_data) - } - } - } + if !header.Clustered { + return fmt.Errorf("Error: source archive must be clustered for extracts.") + } + + source_metadata_offset := header.MetadataOffset + source_tile_data_offset := header.TileDataOffset + + if header.MaxZoom < maxzoom || maxzoom == 0 { + maxzoom = header.MaxZoom + } + + // 2. construct a relevance bitmap + dat, _ := ioutil.ReadFile(region_file) + f, _ := geojson.UnmarshalFeature(dat) + + var multipolygon orb.MultiPolygon + switch v := f.Geometry.(type) { + case orb.Polygon: + multipolygon = []orb.Polygon{v} + case orb.MultiPolygon: + multipolygon = v + } + + bound := multipolygon.Bound() + + boundary_set, interior_set := bitmapMultiPolygon(maxzoom, multipolygon) + + relevant_set := boundary_set + relevant_set.Or(interior_set) + generalizeOr(relevant_set) + + // 3. get relevant entries from root + dir_offset := header.RootOffset + dir_length := header.RootLength + + root_reader, err := bucket.NewRangeReader(ctx, file, int64(dir_offset), int64(dir_length), nil) + if err != nil { + return err + } + defer root_reader.Close() + root_bytes, err := io.ReadAll(root_reader) + if err != nil { + return err + } + + root_dir := deserialize_entries(bytes.NewBuffer(root_bytes)) + + tile_entries, leaves := RelevantEntries(relevant_set, maxzoom, root_dir) + + // 4. get all relevant leaf entries + + leaf_ranges := make([]SrcDstRange, 0) + for _, leaf := range leaves { + leaf_ranges = append(leaf_ranges, SrcDstRange{header.LeafDirectoryOffset + leaf.Offset, 0, uint64(leaf.Length)}) + } + + overfetch_leaves := MergeRanges(leaf_ranges, overfetch) + fmt.Printf("fetching %d dirs, %d chunks, %d requests\n", len(leaves), len(leaf_ranges), len(overfetch_leaves)) + + for _, or := range overfetch_leaves { + + slab_r, err := bucket.NewRangeReader(ctx, file, int64(or.Rng.SrcOffset), int64(or.Rng.Length), nil) + if err != nil { + return err + } + + for _, cd := range or.CopyDiscards { + + leaf_bytes := make([]byte, cd.Wanted) + _, err := io.ReadFull(slab_r, leaf_bytes) + if err != nil { + return err + } + leafdir := deserialize_entries(bytes.NewBuffer(leaf_bytes)) + new_entries, new_leaves := RelevantEntries(relevant_set, maxzoom, leafdir) + + if len(new_leaves) > 0 { + panic("This doesn't support leaf level 2+.") + } + tile_entries = append(tile_entries, new_entries...) + + _, err = io.CopyN(io.Discard, slab_r, int64(cd.Discard)) + if err != nil { + return err } } + slab_r.Close() } - for key, rng := range root_directory.Entries { - if Matches(z, minX, minY, maxX, maxY, key) { - _, err = f.Seek(int64(rng.Offset), 0) + + sort.Slice(tile_entries, func(i, j int) bool { + return tile_entries[i].TileId < tile_entries[j].TileId + }) + + fmt.Printf("Region tiles %d, result tile entries %d\n", relevant_set.GetCardinality(), len(tile_entries)) + + // 6. create the new header and chunk list + // we now need to re-encode this entry list using cumulative offsets + reencoded, tile_parts, tiledata_length, addressed_tiles, tile_contents := ReencodeEntries(tile_entries) + + overfetch_ranges := MergeRanges(tile_parts, overfetch) + fmt.Printf("fetching %d tiles, %d chunks, %d requests\n", len(reencoded), len(tile_parts), len(overfetch_ranges)) + + // construct the directories + new_root_bytes, new_leaves_bytes, _ := optimize_directories(reencoded, 16384-HEADERV3_LEN_BYTES) + + // 7. write the modified header + header.RootOffset = HEADERV3_LEN_BYTES + header.RootLength = uint64(len(new_root_bytes)) + header.MetadataOffset = header.RootOffset + header.RootLength + header.LeafDirectoryOffset = header.MetadataOffset + header.MetadataLength + header.LeafDirectoryLength = uint64(len(new_leaves_bytes)) + header.TileDataOffset = header.LeafDirectoryOffset + header.LeafDirectoryLength + + header.TileDataLength = tiledata_length + header.AddressedTilesCount = addressed_tiles + header.TileEntriesCount = uint64(len(tile_entries)) + header.TileContentsCount = tile_contents + + header.MinLonE7 = int32(bound.Left() * 10000000) + header.MinLatE7 = int32(bound.Bottom() * 10000000) + header.MaxLonE7 = int32(bound.Right() * 10000000) + header.MaxLatE7 = int32(bound.Top() * 10000000) + header.CenterLonE7 = int32(bound.Center().X() * 10000000) + header.CenterLatE7 = int32(bound.Center().Y() * 10000000) + header.MaxZoom = maxzoom + + header_bytes := serialize_header(header) + + total_bytes := uint64(0) + for _, x := range overfetch_ranges { + total_bytes += x.Rng.Length + } + + if !dry_run { + + outfile, err := os.Create(output) + defer outfile.Close() + _, err = outfile.Write(header_bytes) + if err != nil { + return err + } + + // 8. write the root directory + _, err = outfile.Write(new_root_bytes) + if err != nil { + return err + } + + // 9. get and write the metadata + metadata_reader, err := bucket.NewRangeReader(ctx, file, int64(source_metadata_offset), int64(header.MetadataLength), nil) + if err != nil { + return err + } + metadata_bytes, err := io.ReadAll(metadata_reader) + defer metadata_reader.Close() + if err != nil { + return err + } + + outfile.Write(metadata_bytes) + + // 10. write the leaf directories + _, err = outfile.Write(new_leaves_bytes) + if err != nil { + return err + } + + bar := progressbar.DefaultBytes( + int64(total_bytes), + "fetching chunks", + ) + + for _, or := range overfetch_ranges { + + tile_r, err := bucket.NewRangeReader(ctx, file, int64(source_tile_data_offset+or.Rng.SrcOffset), int64(or.Rng.Length), nil) if err != nil { - return + return err + } + + for _, cd := range or.CopyDiscards { + _, err := io.CopyN(io.MultiWriter(outfile, bar), tile_r, int64(cd.Wanted)) + if err != nil { + return err + } + + _, err = io.CopyN(io.MultiWriter(io.Discard, bar), tile_r, int64(cd.Discard)) + if err != nil { + return err + } } - tile_data := make([]byte, rng.Length) - io.ReadFull(f, tile_data) - // writer.WriteTile(key, tile_data) + tile_r.Close() } } - new_metadata_bytes, _ := json.Marshal(metadata) - _ = new_metadata_bytes - // writer.Finalize(new_metadata_bytes) + total_actual_bytes := uint64(0) + for _, x := range tile_parts { + total_actual_bytes += x.Length + } + + fmt.Printf("Completed in %v seconds with 1 download thread.\n", time.Since(start)) + total_requests := 2 // header + root + total_requests += len(overfetch_leaves) // leaves + total_requests += 1 // metadata + total_requests += len(overfetch_ranges) + fmt.Printf("Extract required %d total requests.\n", total_requests) + fmt.Printf("Extract transferred %s (overfetch %v) for an archive size of %s\n", humanize.Bytes(total_bytes), overfetch, humanize.Bytes(total_actual_bytes)) + fmt.Println("Verify your extract is usable at https://protomaps.github.io/PMTiles/") + fmt.Println("Feedback wanted! report your success or failure to https://github.com/protomaps/go-pmtiles/issues") + + return nil } diff --git a/pmtiles/extract_test.go b/pmtiles/extract_test.go index b809e4f..7464b9c 100644 --- a/pmtiles/extract_test.go +++ b/pmtiles/extract_test.go @@ -1,31 +1,164 @@ package pmtiles import ( + "fmt" + "github.com/RoaringBitmap/roaring/roaring64" "github.com/stretchr/testify/assert" "testing" ) -func TestPointToTile(t *testing.T) { - assert.Equal(t,Zxy{4, 8, 8},PointToTile(4, 0, 0)) - assert.Equal(t,Zxy{4, 0, 15},PointToTile(4, -180, -85)) - assert.Equal(t,Zxy{4, 0, 0},PointToTile(4, -180, 85)) - assert.Equal(t, Zxy{4, 15, 15},PointToTile(4, 179.999, -85)) - assert.Equal(t, Zxy{4, 15, 0},PointToTile(4, 179.999, 85)) +func TestRelevantEntries(t *testing.T) { + entries := make([]EntryV3, 0) + entries = append(entries, EntryV3{0, 0, 0, 1}) + + bitmap := roaring64.New() + bitmap.Add(0) + + tiles, leaves := RelevantEntries(bitmap, 4, entries) + + assert.Equal(t, len(tiles), 1) + assert.Equal(t, len(leaves), 0) +} + +func TestRelevantEntriesRunLength(t *testing.T) { + entries := make([]EntryV3, 0) + entries = append(entries, EntryV3{0, 0, 0, 5}) + + bitmap := roaring64.New() + bitmap.Add(1) + bitmap.Add(2) + bitmap.Add(4) + + tiles, leaves := RelevantEntries(bitmap, 4, entries) + + assert.Equal(t, len(tiles), 2) + assert.Equal(t, tiles[0].RunLength, uint32(2)) + assert.Equal(t, tiles[1].RunLength, uint32(1)) + assert.Equal(t, len(leaves), 0) +} + +func TestRelevantEntriesLeaf(t *testing.T) { + entries := make([]EntryV3, 0) + entries = append(entries, EntryV3{0, 0, 0, 0}) + + bitmap := roaring64.New() + bitmap.Add(1) + + tiles, leaves := RelevantEntries(bitmap, 4, entries) + + assert.Equal(t, len(tiles), 0) + assert.Equal(t, len(leaves), 1) } -func TestMatchSameLevel(t *testing.T) { - // same level - assert.True(t, Matches(0, 0, 0, 0, 0, Zxy{0, 0, 0})) - assert.True(t, Matches(4, 0, 0, 8, 8, Zxy{4, 4, 4})) +func TestRelevantEntriesNotLeaf(t *testing.T) { + entries := make([]EntryV3, 0) + entries = append(entries, EntryV3{0, 0, 0, 0}) + entries = append(entries, EntryV3{2, 0, 0, 1}) + entries = append(entries, EntryV3{4, 0, 0, 0}) + + bitmap := roaring64.New() + bitmap.Add(3) + + tiles, leaves := RelevantEntries(bitmap, 4, entries) + + assert.Equal(t, len(tiles), 0) + assert.Equal(t, len(leaves), 0) } -func TestMatchCandidateLower(t *testing.T) { - assert.True(t,Matches(4, 0, 0, 8, 8, Zxy{0, 0, 0})) - assert.True(t,Matches(4, 15, 15, 15, 15, Zxy{0, 0, 0})) - assert.False(t,Matches(4, 15, 15, 15, 15, Zxy{1, 0, 0})) - assert.True(t,Matches(4, 15, 15, 15, 15, Zxy{1, 1, 1})) +func TestRelevantEntriesMaxZoom(t *testing.T) { + entries := make([]EntryV3, 0) + entries = append(entries, EntryV3{0, 0, 0, 0}) + + bitmap := roaring64.New() + bitmap.Add(6) + _, leaves := RelevantEntries(bitmap, 1, entries) + assert.Equal(t, len(leaves), 0) + + _, leaves = RelevantEntries(bitmap, 2, entries) + assert.Equal(t, len(leaves), 1) } -func TestMatchCandidateHigher(t *testing.T) { - assert.True(t,Matches(4, 0, 0, 8, 8, Zxy{8, 0, 0})) +func TestReencodeEntries(t *testing.T) { + entries := make([]EntryV3, 0) + entries = append(entries, EntryV3{0, 400, 10, 1}) + entries = append(entries, EntryV3{1, 500, 20, 2}) + + reencoded, result, datalen, addressed, contents := ReencodeEntries(entries) + + assert.Equal(t, 2, len(result)) + assert.Equal(t, result[0].SrcOffset, uint64(400)) + assert.Equal(t, result[0].Length, uint64(10)) + assert.Equal(t, result[1].SrcOffset, uint64(500)) + assert.Equal(t, result[1].Length, uint64(20)) + + assert.Equal(t, 2, len(reencoded)) + assert.Equal(t, reencoded[0].Offset, uint64(0)) + assert.Equal(t, reencoded[1].Offset, uint64(10)) + + assert.Equal(t, uint64(30), datalen) + assert.Equal(t, uint64(3), addressed) + assert.Equal(t, uint64(2), contents) +} + +func TestReencodeEntriesDuplicate(t *testing.T) { + entries := make([]EntryV3, 0) + entries = append(entries, EntryV3{0, 400, 10, 1}) + entries = append(entries, EntryV3{1, 500, 20, 1}) + entries = append(entries, EntryV3{2, 400, 10, 1}) + + reencoded, result, datalen, addressed, contents := ReencodeEntries(entries) + + assert.Equal(t, 2, len(result)) + assert.Equal(t, result[0].SrcOffset, uint64(400)) + assert.Equal(t, result[0].Length, uint64(10)) + assert.Equal(t, result[1].SrcOffset, uint64(500)) + assert.Equal(t, result[1].Length, uint64(20)) + + assert.Equal(t, len(reencoded), 3) + assert.Equal(t, reencoded[0].Offset, uint64(0)) + assert.Equal(t, reencoded[1].Offset, uint64(10)) + assert.Equal(t, reencoded[2].Offset, uint64(0)) + + assert.Equal(t, uint64(30), datalen) + assert.Equal(t, uint64(3), addressed) + assert.Equal(t, uint64(2), contents) +} + +func TestReencodeContiguous(t *testing.T) { + entries := make([]EntryV3, 0) + entries = append(entries, EntryV3{0, 400, 10, 0}) + entries = append(entries, EntryV3{1, 410, 20, 0}) + + _, result, _, _, _ := ReencodeEntries(entries) + + assert.Equal(t, len(result), 1) + assert.Equal(t, result[0].SrcOffset, uint64(400)) + assert.Equal(t, result[0].Length, uint64(30)) +} + +func TestMergeRanges(t *testing.T) { + ranges := make([]SrcDstRange, 0) + ranges = append(ranges, SrcDstRange{0, 0, 50}) + ranges = append(ranges, SrcDstRange{60, 60, 60}) + + result := MergeRanges(ranges, 0.1) + + assert.Equal(t, 1, len(result)) + assert.Equal(t, SrcDstRange{0, 0, 120}, result[0].Rng) + assert.Equal(t, 2, len(result[0].CopyDiscards)) + assert.Equal(t, CopyDiscard{50, 10}, result[0].CopyDiscards[0]) + assert.Equal(t, CopyDiscard{60, 0}, result[0].CopyDiscards[1]) +} + +func TestMergeRangesMultiple(t *testing.T) { + ranges := make([]SrcDstRange, 0) + ranges = append(ranges, SrcDstRange{0, 0, 50}) + ranges = append(ranges, SrcDstRange{60, 60, 10}) + ranges = append(ranges, SrcDstRange{80, 80, 10}) + + result := MergeRanges(ranges, 0.3) + assert.Equal(t, 1, len(result)) + assert.Equal(t, SrcDstRange{0, 0, 90}, result[0].Rng) + assert.Equal(t, 3, len(result[0].CopyDiscards)) + fmt.Println(result) } diff --git a/pmtiles/readerv2.go b/pmtiles/readerv2.go index befcaef..62c7dcc 100644 --- a/pmtiles/readerv2.go +++ b/pmtiles/readerv2.go @@ -14,7 +14,7 @@ type Zxy struct { type Range struct { Offset uint64 - Length uint32 + Length uint64 } type DirectoryV2 struct { @@ -51,7 +51,7 @@ func ParseEntryV2(b []byte) (uint8, Zxy, Range) { x := readUint24(x_raw) y := readUint24(y_raw) offset := readUint48(offset_raw) - length := binary.LittleEndian.Uint32(length_raw) + length := uint64(binary.LittleEndian.Uint32(length_raw)) if z_raw&0b10000000 == 0 { return 0, Zxy{Z: uint8(z_raw), X: uint32(x), Y: uint32(y)}, Range{Offset: offset, Length: length} } else {