Skip to content

Commit

Permalink
Support public HTTP buckets (#74)
Browse files Browse the repository at this point in the history
* All operations support public HTTP endpoints [#68]

* fix normalization of local paths via filepath.Abs
* force the parameter awssdk=v2 for s3 buckets to use newer SDK

* handle maxzoom-only extracts [#68, #64]

* fix default maxzoom; support raw geojson geometries [#68, #64]
  • Loading branch information
bdon authored Sep 11, 2023
1 parent a296345 commit 8cc17b2
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 116 deletions.
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var cli struct {
Output string `arg:"" help:"Output archive." type:"path"`
Bucket string `help:"Remote bucket of input archive."`
Region string `help:"local GeoJSON Polygon or MultiPolygon file for area of interest." type:"existingfile"`
Maxzoom uint8 `help:"Maximum zoom level, inclusive."`
Maxzoom int8 `default:-1 help:"Maximum zoom level, inclusive."`
DownloadThreads int `default:4 help:"Number of download threads."`
DryRun bool `help:"Calculate tiles to extract, but don't download them."`
Overfetch float32 `default:0.1 help:"What ratio of extra data to download to minimize # requests; 0.2 is 20%"`
Expand Down
122 changes: 122 additions & 0 deletions pmtiles/bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package pmtiles

import (
"context"
"fmt"
"gocloud.dev/blob"
"io"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strings"
)

type Bucket interface {
Close() error
NewRangeReader(ctx context.Context, key string, offset int64, length int64) (io.ReadCloser, error)
}

type HttpBucket struct {
baseURL string
}

func (b HttpBucket) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) {
reqURL := b.baseURL + "/" + key

req, err := http.NewRequest("GET", reqURL, nil)
if err != nil {
return nil, err
}

req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
resp.Body.Close()
return nil, fmt.Errorf("HTTP error: %d", resp.StatusCode)
}

return resp.Body, nil
}

func (b HttpBucket) Close() error {
return nil
}

type BucketAdapter struct {
Bucket *blob.Bucket
}

func (ba BucketAdapter) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) {
reader, err := ba.Bucket.NewRangeReader(ctx, key, offset, length, nil)
if err != nil {
return nil, err
}
return reader, nil
}

func (ba BucketAdapter) Close() error {
return ba.Bucket.Close()
}

func NormalizeBucketKey(bucket string, prefix string, key string) (string, string, error) {
if bucket == "" {
if strings.HasPrefix(key, "http") {
u, err := url.Parse(key)
if err != nil {
return "", "", err
}
dir, file := path.Split(u.Path)
if strings.HasSuffix(dir, "/") {
dir = dir[:len(dir)-1]
}
return u.Scheme + "://" + u.Host + dir, file, nil
} else {
if prefix != "" {
abs, err := filepath.Abs(prefix)
if err != nil {
return "", "", err
}
return "file://" + abs, key, nil
}
abs, err := filepath.Abs(key)
if err != nil {
return "", "", err
}
return "file://" + filepath.Dir(abs), filepath.Base(abs), nil
}
}

if strings.HasPrefix(bucket, "s3") {
u, err := url.Parse(bucket)
if err != nil {
fmt.Println("Error parsing URL:", err)
return "", "", err
}
values := u.Query()
values.Set("awssdk", "v2")
u.RawQuery = values.Encode()
return u.String(), key, nil
}
return bucket, key, nil
}

func OpenBucket(ctx context.Context, bucketURL string, bucketPrefix string) (Bucket, error) {
if strings.HasPrefix(bucketURL, "http") {
bucket := HttpBucket{bucketURL}
return bucket, nil
} else {
bucket, err := blob.OpenBucket(ctx, bucketURL)
if bucketPrefix != "/" && bucketPrefix != "." {
bucket = blob.PrefixedBucket(bucket, path.Clean(bucketPrefix)+string(os.PathSeparator))
}
wrapped_bucket := BucketAdapter{bucket}
return wrapped_bucket, err
}
}
35 changes: 35 additions & 0 deletions pmtiles/bucket_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package pmtiles

import (
"github.com/stretchr/testify/assert"
"testing"
"strings"
"fmt"
)


func TestNormalizeLocalFile(t *testing.T) {
bucket, key, _ := NormalizeBucketKey("", "", "../foo/bar.pmtiles")
assert.Equal(t, "bar.pmtiles", key)
assert.True(t, strings.HasSuffix(bucket, "/foo"))
assert.True(t, strings.HasPrefix(bucket, "file://"))
}

func TestNormalizeHttp(t *testing.T) {
bucket, key, _ := NormalizeBucketKey("", "", "http://example.com/foo/bar.pmtiles")
assert.Equal(t, "bar.pmtiles", key)
assert.Equal(t, "http://example.com/foo", bucket)
}

func TestNormalizeAwsSdkVersion(t *testing.T) {
bucket, key, _ := NormalizeBucketKey("s3://mybucket?awssdk=v1&endpoint=https://foo.bar", "", "abc")
assert.Equal(t, "abc", key)
assert.Equal(t, "s3://mybucket?awssdk=v2&endpoint=https%3A%2F%2Ffoo.bar", bucket)
}
func TestNormalizePathPrefixServer(t *testing.T) {
bucket, key, _ := NormalizeBucketKey("", "../foo", "")
assert.Equal(t, "", key)
fmt.Println(bucket)
assert.True(t, strings.HasSuffix(bucket, "/foo"))
assert.True(t, strings.HasPrefix(bucket, "file://"))
}
106 changes: 55 additions & 51 deletions pmtiles/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,19 @@ package pmtiles

import (
"bytes"
"context"
"container/list"
"context"
"fmt"
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/dustin/go-humanize"
"github.com/paulmach/orb"
"github.com/paulmach/orb/geojson"
"github.com/schollz/progressbar/v3"
"gocloud.dev/blob"
"golang.org/x/sync/errgroup"
"io"
"io/ioutil"
"log"
"math"
"os"
"sort"
"strings"
"sync"
"time"
)
Expand Down Expand Up @@ -252,31 +248,34 @@ func MergeRanges(ranges []SrcDstRange, overfetch float32) (*list.List, uint64) {
// 10. write the leaf directories (if any)
// 11. Get all tiles, and write directly to the output.

func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, region_file string, output string, download_threads int, overfetch float32, dry_run bool) error {
func Extract(logger *log.Logger, bucketURL string, key string, maxzoom int8, region_file string, output string, download_threads int, overfetch float32, dry_run bool) error {
// 1. fetch the header

if bucketURL == "" {
if strings.HasPrefix(file, "/") {
bucketURL = "file:///"
} else {
bucketURL = "file://"
}
}

fmt.Println("WARNING: extract is an experimental feature and results may not be suitable for production use.")
start := time.Now()

ctx := context.Background()
bucket, err := blob.OpenBucket(ctx, bucketURL)

bucketURL, key, err := NormalizeBucketKey(bucketURL, "", key)

if err != nil {
return err
}

bucket, err := OpenBucket(ctx, bucketURL, "")

if err != nil {
return err
}

if err != nil {
return fmt.Errorf("Failed to open bucket for %s, %w", bucketURL, err)
}
defer bucket.Close()

r, err := bucket.NewRangeReader(ctx, file, 0, HEADERV3_LEN_BYTES, nil)
r, err := bucket.NewRangeReader(ctx, key, 0, HEADERV3_LEN_BYTES)

if err != nil {
return fmt.Errorf("Failed to create range reader for %s, %w", file, err)
return fmt.Errorf("Failed to create range reader for %s, %w", key, err)
}
b, err := io.ReadAll(r)
if err != nil {
Expand All @@ -293,35 +292,46 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
source_metadata_offset := header.MetadataOffset
source_tile_data_offset := header.TileDataOffset

if header.MaxZoom < maxzoom || maxzoom == 0 {
maxzoom = header.MaxZoom
}

// 2. construct a relevance bitmap
dat, _ := ioutil.ReadFile(region_file)
f, _ := geojson.UnmarshalFeature(dat)
fmt.Println(maxzoom)

var multipolygon orb.MultiPolygon
switch v := f.Geometry.(type) {
case orb.Polygon:
multipolygon = []orb.Polygon{v}
case orb.MultiPolygon:
multipolygon = v
if maxzoom == -1 || int8(header.MaxZoom) < maxzoom {
maxzoom = int8(header.MaxZoom)
}

bound := multipolygon.Bound()
var relevant_set *roaring64.Bitmap
if region_file != "" {

// 2. construct a relevance bitmap
dat, _ := ioutil.ReadFile(region_file)
multipolygon, err := UnmarshalRegion(dat)

boundary_set, interior_set := bitmapMultiPolygon(maxzoom, multipolygon)
if err != nil {
return err
}

relevant_set := boundary_set
relevant_set.Or(interior_set)
generalizeOr(relevant_set)
bound := multipolygon.Bound()

boundary_set, interior_set := bitmapMultiPolygon(uint8(maxzoom), multipolygon)
relevant_set = boundary_set
relevant_set.Or(interior_set)
generalizeOr(relevant_set)

header.MinLonE7 = int32(bound.Left() * 10000000)
header.MinLatE7 = int32(bound.Bottom() * 10000000)
header.MaxLonE7 = int32(bound.Right() * 10000000)
header.MaxLatE7 = int32(bound.Top() * 10000000)
header.CenterLonE7 = int32(bound.Center().X() * 10000000)
header.CenterLatE7 = int32(bound.Center().Y() * 10000000)
} else {
relevant_set = roaring64.New()
relevant_set.AddRange(0, ZxyToId(uint8(maxzoom)+1, 0, 0))
}

// 3. get relevant entries from root
dir_offset := header.RootOffset
dir_length := header.RootLength

root_reader, err := bucket.NewRangeReader(ctx, file, int64(dir_offset), int64(dir_length), nil)
root_reader, err := bucket.NewRangeReader(ctx, key, int64(dir_offset), int64(dir_length))
if err != nil {
return err
}
Expand All @@ -333,7 +343,7 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r

root_dir := deserialize_entries(bytes.NewBuffer(root_bytes))

tile_entries, leaves := RelevantEntries(relevant_set, maxzoom, root_dir)
tile_entries, leaves := RelevantEntries(relevant_set, uint8(maxzoom), root_dir)

// 4. get all relevant leaf entries

Expand All @@ -352,7 +362,7 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
}
or := overfetch_leaves.Remove(overfetch_leaves.Front()).(OverfetchRange)

slab_r, err := bucket.NewRangeReader(ctx, file, int64(or.Rng.SrcOffset), int64(or.Rng.Length), nil)
slab_r, err := bucket.NewRangeReader(ctx, key, int64(or.Rng.SrcOffset), int64(or.Rng.Length))
if err != nil {
return err
}
Expand All @@ -365,7 +375,7 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
return err
}
leafdir := deserialize_entries(bytes.NewBuffer(leaf_bytes))
new_entries, new_leaves := RelevantEntries(relevant_set, maxzoom, leafdir)
new_entries, new_leaves := RelevantEntries(relevant_set, uint8(maxzoom), leafdir)

if len(new_leaves) > 0 {
panic("This doesn't support leaf level 2+.")
Expand Down Expand Up @@ -412,13 +422,7 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
header.TileEntriesCount = uint64(len(tile_entries))
header.TileContentsCount = tile_contents

header.MinLonE7 = int32(bound.Left() * 10000000)
header.MinLatE7 = int32(bound.Bottom() * 10000000)
header.MaxLonE7 = int32(bound.Right() * 10000000)
header.MaxLatE7 = int32(bound.Top() * 10000000)
header.CenterLonE7 = int32(bound.Center().X() * 10000000)
header.CenterLatE7 = int32(bound.Center().Y() * 10000000)
header.MaxZoom = maxzoom
header.MaxZoom = uint8(maxzoom)

header_bytes := serialize_header(header)

Expand Down Expand Up @@ -446,7 +450,7 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
}

// 9. get and write the metadata
metadata_reader, err := bucket.NewRangeReader(ctx, file, int64(source_metadata_offset), int64(header.MetadataLength), nil)
metadata_reader, err := bucket.NewRangeReader(ctx, key, int64(source_metadata_offset), int64(header.MetadataLength))
if err != nil {
return err
}
Expand All @@ -472,7 +476,7 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
var mu sync.Mutex

downloadPart := func(or OverfetchRange) error {
tile_r, err := bucket.NewRangeReader(ctx, file, int64(source_tile_data_offset+or.Rng.SrcOffset), int64(or.Rng.Length), nil)
tile_r, err := bucket.NewRangeReader(ctx, key, int64(source_tile_data_offset+or.Rng.SrcOffset), int64(or.Rng.Length))
if err != nil {
return err
}
Expand Down Expand Up @@ -533,9 +537,9 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
}

fmt.Printf("Completed in %v with %v download threads.\n", time.Since(start), download_threads)
total_requests := 2 // header + root
total_requests := 2 // header + root
total_requests += num_overfetch_leaves // leaves
total_requests += 1 // metadata
total_requests += 1 // metadata
total_requests += num_overfetch_ranges
fmt.Printf("Extract required %d total requests.\n", total_requests)
fmt.Printf("Extract transferred %s (overfetch %v) for an archive size of %s\n", humanize.Bytes(total_bytes), overfetch, humanize.Bytes(total_actual_bytes))
Expand Down
Loading

0 comments on commit 8cc17b2

Please sign in to comment.