Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support public HTTP buckets #74

Merged
merged 3 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var cli struct {
Output string `arg:"" help:"Output archive." type:"path"`
Bucket string `help:"Remote bucket of input archive."`
Region string `help:"local GeoJSON Polygon or MultiPolygon file for area of interest." type:"existingfile"`
Maxzoom uint8 `help:"Maximum zoom level, inclusive."`
Maxzoom int8 `default:-1 help:"Maximum zoom level, inclusive."`
DownloadThreads int `default:4 help:"Number of download threads."`
DryRun bool `help:"Calculate tiles to extract, but don't download them."`
Overfetch float32 `default:0.1 help:"What ratio of extra data to download to minimize # requests; 0.2 is 20%"`
Expand Down
122 changes: 122 additions & 0 deletions pmtiles/bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package pmtiles

import (
"context"
"fmt"
"gocloud.dev/blob"
"io"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strings"
)

type Bucket interface {
Close() error
NewRangeReader(ctx context.Context, key string, offset int64, length int64) (io.ReadCloser, error)
}

type HttpBucket struct {
baseURL string
}

func (b HttpBucket) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) {
reqURL := b.baseURL + "/" + key

req, err := http.NewRequest("GET", reqURL, nil)
if err != nil {
return nil, err
}

req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
resp.Body.Close()
return nil, fmt.Errorf("HTTP error: %d", resp.StatusCode)
}

return resp.Body, nil
}

func (b HttpBucket) Close() error {
return nil
}

type BucketAdapter struct {
Bucket *blob.Bucket
}

func (ba BucketAdapter) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) {
reader, err := ba.Bucket.NewRangeReader(ctx, key, offset, length, nil)
if err != nil {
return nil, err
}
return reader, nil
}

func (ba BucketAdapter) Close() error {
return ba.Bucket.Close()
}

func NormalizeBucketKey(bucket string, prefix string, key string) (string, string, error) {
if bucket == "" {
if strings.HasPrefix(key, "http") {
u, err := url.Parse(key)
if err != nil {
return "", "", err
}
dir, file := path.Split(u.Path)
if strings.HasSuffix(dir, "/") {
dir = dir[:len(dir)-1]
}
return u.Scheme + "://" + u.Host + dir, file, nil
} else {
if prefix != "" {
abs, err := filepath.Abs(prefix)
if err != nil {
return "", "", err
}
return "file://" + abs, key, nil
}
abs, err := filepath.Abs(key)
if err != nil {
return "", "", err
}
return "file://" + filepath.Dir(abs), filepath.Base(abs), nil
}
}

if strings.HasPrefix(bucket, "s3") {
u, err := url.Parse(bucket)
if err != nil {
fmt.Println("Error parsing URL:", err)
return "", "", err
}
values := u.Query()
values.Set("awssdk", "v2")
u.RawQuery = values.Encode()
return u.String(), key, nil
}
return bucket, key, nil
}

func OpenBucket(ctx context.Context, bucketURL string, bucketPrefix string) (Bucket, error) {
if strings.HasPrefix(bucketURL, "http") {
bucket := HttpBucket{bucketURL}
return bucket, nil
} else {
bucket, err := blob.OpenBucket(ctx, bucketURL)
if bucketPrefix != "/" && bucketPrefix != "." {
bucket = blob.PrefixedBucket(bucket, path.Clean(bucketPrefix)+string(os.PathSeparator))
}
wrapped_bucket := BucketAdapter{bucket}
return wrapped_bucket, err
}
}
35 changes: 35 additions & 0 deletions pmtiles/bucket_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package pmtiles

import (
"github.com/stretchr/testify/assert"
"testing"
"strings"
"fmt"
)


func TestNormalizeLocalFile(t *testing.T) {
bucket, key, _ := NormalizeBucketKey("", "", "../foo/bar.pmtiles")
assert.Equal(t, "bar.pmtiles", key)
assert.True(t, strings.HasSuffix(bucket, "/foo"))
assert.True(t, strings.HasPrefix(bucket, "file://"))
}

func TestNormalizeHttp(t *testing.T) {
bucket, key, _ := NormalizeBucketKey("", "", "http://example.com/foo/bar.pmtiles")
assert.Equal(t, "bar.pmtiles", key)
assert.Equal(t, "http://example.com/foo", bucket)
}

func TestNormalizeAwsSdkVersion(t *testing.T) {
bucket, key, _ := NormalizeBucketKey("s3://mybucket?awssdk=v1&endpoint=https://foo.bar", "", "abc")
assert.Equal(t, "abc", key)
assert.Equal(t, "s3://mybucket?awssdk=v2&endpoint=https%3A%2F%2Ffoo.bar", bucket)
}
func TestNormalizePathPrefixServer(t *testing.T) {
bucket, key, _ := NormalizeBucketKey("", "../foo", "")
assert.Equal(t, "", key)
fmt.Println(bucket)
assert.True(t, strings.HasSuffix(bucket, "/foo"))
assert.True(t, strings.HasPrefix(bucket, "file://"))
}
106 changes: 55 additions & 51 deletions pmtiles/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,19 @@ package pmtiles

import (
"bytes"
"context"
"container/list"
"context"
"fmt"
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/dustin/go-humanize"
"github.com/paulmach/orb"
"github.com/paulmach/orb/geojson"
"github.com/schollz/progressbar/v3"
"gocloud.dev/blob"
"golang.org/x/sync/errgroup"
"io"
"io/ioutil"
"log"
"math"
"os"
"sort"
"strings"
"sync"
"time"
)
Expand Down Expand Up @@ -252,31 +248,34 @@ func MergeRanges(ranges []SrcDstRange, overfetch float32) (*list.List, uint64) {
// 10. write the leaf directories (if any)
// 11. Get all tiles, and write directly to the output.

func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, region_file string, output string, download_threads int, overfetch float32, dry_run bool) error {
func Extract(logger *log.Logger, bucketURL string, key string, maxzoom int8, region_file string, output string, download_threads int, overfetch float32, dry_run bool) error {
// 1. fetch the header

if bucketURL == "" {
if strings.HasPrefix(file, "/") {
bucketURL = "file:///"
} else {
bucketURL = "file://"
}
}

fmt.Println("WARNING: extract is an experimental feature and results may not be suitable for production use.")
start := time.Now()

ctx := context.Background()
bucket, err := blob.OpenBucket(ctx, bucketURL)

bucketURL, key, err := NormalizeBucketKey(bucketURL, "", key)

if err != nil {
return err
}

bucket, err := OpenBucket(ctx, bucketURL, "")

if err != nil {
return err
}

if err != nil {
return fmt.Errorf("Failed to open bucket for %s, %w", bucketURL, err)
}
defer bucket.Close()

r, err := bucket.NewRangeReader(ctx, file, 0, HEADERV3_LEN_BYTES, nil)
r, err := bucket.NewRangeReader(ctx, key, 0, HEADERV3_LEN_BYTES)

if err != nil {
return fmt.Errorf("Failed to create range reader for %s, %w", file, err)
return fmt.Errorf("Failed to create range reader for %s, %w", key, err)
}
b, err := io.ReadAll(r)
if err != nil {
Expand All @@ -293,35 +292,46 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
source_metadata_offset := header.MetadataOffset
source_tile_data_offset := header.TileDataOffset

if header.MaxZoom < maxzoom || maxzoom == 0 {
maxzoom = header.MaxZoom
}

// 2. construct a relevance bitmap
dat, _ := ioutil.ReadFile(region_file)
f, _ := geojson.UnmarshalFeature(dat)
fmt.Println(maxzoom)

var multipolygon orb.MultiPolygon
switch v := f.Geometry.(type) {
case orb.Polygon:
multipolygon = []orb.Polygon{v}
case orb.MultiPolygon:
multipolygon = v
if maxzoom == -1 || int8(header.MaxZoom) < maxzoom {
maxzoom = int8(header.MaxZoom)
}

bound := multipolygon.Bound()
var relevant_set *roaring64.Bitmap
if region_file != "" {

// 2. construct a relevance bitmap
dat, _ := ioutil.ReadFile(region_file)
multipolygon, err := UnmarshalRegion(dat)

boundary_set, interior_set := bitmapMultiPolygon(maxzoom, multipolygon)
if err != nil {
return err
}

relevant_set := boundary_set
relevant_set.Or(interior_set)
generalizeOr(relevant_set)
bound := multipolygon.Bound()

boundary_set, interior_set := bitmapMultiPolygon(uint8(maxzoom), multipolygon)
relevant_set = boundary_set
relevant_set.Or(interior_set)
generalizeOr(relevant_set)

header.MinLonE7 = int32(bound.Left() * 10000000)
header.MinLatE7 = int32(bound.Bottom() * 10000000)
header.MaxLonE7 = int32(bound.Right() * 10000000)
header.MaxLatE7 = int32(bound.Top() * 10000000)
header.CenterLonE7 = int32(bound.Center().X() * 10000000)
header.CenterLatE7 = int32(bound.Center().Y() * 10000000)
} else {
relevant_set = roaring64.New()
relevant_set.AddRange(0, ZxyToId(uint8(maxzoom)+1, 0, 0))
}

// 3. get relevant entries from root
dir_offset := header.RootOffset
dir_length := header.RootLength

root_reader, err := bucket.NewRangeReader(ctx, file, int64(dir_offset), int64(dir_length), nil)
root_reader, err := bucket.NewRangeReader(ctx, key, int64(dir_offset), int64(dir_length))
if err != nil {
return err
}
Expand All @@ -333,7 +343,7 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r

root_dir := deserialize_entries(bytes.NewBuffer(root_bytes))

tile_entries, leaves := RelevantEntries(relevant_set, maxzoom, root_dir)
tile_entries, leaves := RelevantEntries(relevant_set, uint8(maxzoom), root_dir)

// 4. get all relevant leaf entries

Expand All @@ -352,7 +362,7 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
}
or := overfetch_leaves.Remove(overfetch_leaves.Front()).(OverfetchRange)

slab_r, err := bucket.NewRangeReader(ctx, file, int64(or.Rng.SrcOffset), int64(or.Rng.Length), nil)
slab_r, err := bucket.NewRangeReader(ctx, key, int64(or.Rng.SrcOffset), int64(or.Rng.Length))
if err != nil {
return err
}
Expand All @@ -365,7 +375,7 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
return err
}
leafdir := deserialize_entries(bytes.NewBuffer(leaf_bytes))
new_entries, new_leaves := RelevantEntries(relevant_set, maxzoom, leafdir)
new_entries, new_leaves := RelevantEntries(relevant_set, uint8(maxzoom), leafdir)

if len(new_leaves) > 0 {
panic("This doesn't support leaf level 2+.")
Expand Down Expand Up @@ -412,13 +422,7 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
header.TileEntriesCount = uint64(len(tile_entries))
header.TileContentsCount = tile_contents

header.MinLonE7 = int32(bound.Left() * 10000000)
header.MinLatE7 = int32(bound.Bottom() * 10000000)
header.MaxLonE7 = int32(bound.Right() * 10000000)
header.MaxLatE7 = int32(bound.Top() * 10000000)
header.CenterLonE7 = int32(bound.Center().X() * 10000000)
header.CenterLatE7 = int32(bound.Center().Y() * 10000000)
header.MaxZoom = maxzoom
header.MaxZoom = uint8(maxzoom)

header_bytes := serialize_header(header)

Expand Down Expand Up @@ -446,7 +450,7 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
}

// 9. get and write the metadata
metadata_reader, err := bucket.NewRangeReader(ctx, file, int64(source_metadata_offset), int64(header.MetadataLength), nil)
metadata_reader, err := bucket.NewRangeReader(ctx, key, int64(source_metadata_offset), int64(header.MetadataLength))
if err != nil {
return err
}
Expand All @@ -472,7 +476,7 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
var mu sync.Mutex

downloadPart := func(or OverfetchRange) error {
tile_r, err := bucket.NewRangeReader(ctx, file, int64(source_tile_data_offset+or.Rng.SrcOffset), int64(or.Rng.Length), nil)
tile_r, err := bucket.NewRangeReader(ctx, key, int64(source_tile_data_offset+or.Rng.SrcOffset), int64(or.Rng.Length))
if err != nil {
return err
}
Expand Down Expand Up @@ -533,9 +537,9 @@ func Extract(logger *log.Logger, bucketURL string, file string, maxzoom uint8, r
}

fmt.Printf("Completed in %v with %v download threads.\n", time.Since(start), download_threads)
total_requests := 2 // header + root
total_requests := 2 // header + root
total_requests += num_overfetch_leaves // leaves
total_requests += 1 // metadata
total_requests += 1 // metadata
total_requests += num_overfetch_ranges
fmt.Printf("Extract required %d total requests.\n", total_requests)
fmt.Printf("Extract transferred %s (overfetch %v) for an archive size of %s\n", humanize.Bytes(total_bytes), overfetch, humanize.Bytes(total_actual_bytes))
Expand Down
Loading