Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set etag on pmtiles serve responses #137

Merged
merged 9 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions caddy/pmtiles_proxy.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
package caddy

Check warning on line 1 in caddy/pmtiles_proxy.go

View workflow job for this annotation

GitHub Actions / fmt_vet_lint

should have a package comment

import (
"bytes"
"fmt"
"io"
"log"
"net/http"
"strconv"
"time"

"github.com/caddyserver/caddy/v2"
"github.com/caddyserver/caddy/v2/caddyconfig/caddyfile"
"github.com/caddyserver/caddy/v2/caddyconfig/httpcaddyfile"
"github.com/caddyserver/caddy/v2/modules/caddyhttp"
"github.com/protomaps/go-pmtiles/pmtiles"
"go.uber.org/zap"
_ "gocloud.dev/blob/azureblob"

Check warning on line 18 in caddy/pmtiles_proxy.go

View workflow job for this annotation

GitHub Actions / fmt_vet_lint

a blank import should be only in a main or test package, or have a comment justifying it
_ "gocloud.dev/blob/fileblob"
_ "gocloud.dev/blob/gcsblob"
_ "gocloud.dev/blob/s3blob"
"io"
"log"
"net/http"
"strconv"
"time"
)

func init() {
Expand All @@ -29,6 +31,7 @@
Bucket string `json:"bucket"`
CacheSize int `json:"cache_size"`
PublicURL string `json:"public_url"`
TileEtag bool `json:"tile_etag"`
logger *zap.Logger
server *pmtiles.Server
}
Expand All @@ -41,11 +44,11 @@
}
}

func (m *Middleware) Provision(ctx caddy.Context) error {

Check warning on line 47 in caddy/pmtiles_proxy.go

View workflow job for this annotation

GitHub Actions / fmt_vet_lint

exported method Middleware.Provision should have comment or be unexported
m.logger = ctx.Logger()
logger := log.New(io.Discard, "", log.Ldate)
prefix := "." // serve only the root of the bucket for now, at the root route of Caddyfile
server, err := pmtiles.NewServer(m.Bucket, prefix, logger, m.CacheSize, "", m.PublicURL)
server, err := pmtiles.NewServer(m.Bucket, prefix, logger, m.CacheSize, "", m.PublicURL, m.TileEtag)
if err != nil {
return err
}
Expand All @@ -54,7 +57,7 @@
return nil
}

func (m *Middleware) Validate() error {

Check warning on line 60 in caddy/pmtiles_proxy.go

View workflow job for this annotation

GitHub Actions / fmt_vet_lint

exported method Middleware.Validate should have comment or be unexported
if m.Bucket == "" {
return fmt.Errorf("no bucket")
}
Expand All @@ -70,14 +73,18 @@
for k, v := range headers {
w.Header().Set(k, v)
}
w.WriteHeader(statusCode)
w.Write(body)
if statusCode == 200 {
http.ServeContent(w, r, "", time.UnixMilli(0), bytes.NewReader(body))
} else {
w.WriteHeader(statusCode)
w.Write(body)
}
msbarry marked this conversation as resolved.
Show resolved Hide resolved
m.logger.Info("response", zap.Int("status", statusCode), zap.String("path", r.URL.Path), zap.Duration("duration", time.Since(start)))

return next.ServeHTTP(w, r)
}

func (m *Middleware) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {

Check warning on line 87 in caddy/pmtiles_proxy.go

View workflow job for this annotation

GitHub Actions / fmt_vet_lint

exported method Middleware.UnmarshalCaddyfile should have comment or be unexported
for d.Next() {
for nesting := d.Nesting(); d.NextBlock(nesting); {
switch d.Val() {
Expand Down
18 changes: 15 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

Check warning on line 1 in main.go

View workflow job for this annotation

GitHub Actions / fmt_vet_lint

should have a package comment

import (
"bytes"
"fmt"
"log"
"net/http"
Expand Down Expand Up @@ -88,6 +89,7 @@
CacheSize int `default:"64" help:"Size of cache in Megabytes."`
Bucket string `help:"Remote bucket"`
PublicURL string `help:"Public base URL of tile endpoint for TileJSON e.g. https://example.com/tiles/"`
TileEtag bool `help:"Generate etag for each tile instead of using archive etag"`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I implemented this it also applies to metadata json and tilejson responses. Should I remove those? Or make this --resource-etag? or just leave it as-is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also in my tests on a 32-core machine enabling this reduces throughput from 40k tiles per second to 36k tiles per second.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe --unique-etags instead, to make it clearer the effect?

} `cmd:"" help:"Run an HTTP proxy server for Z/X/Y tiles."`

Download struct {
Expand Down Expand Up @@ -130,7 +132,7 @@
logger.Fatalf("Failed to show tile, %v", err)
}
case "serve <path>":
server, err := pmtiles.NewServer(cli.Serve.Bucket, cli.Serve.Path, logger, cli.Serve.CacheSize, cli.Serve.Cors, cli.Serve.PublicURL)
server, err := pmtiles.NewServer(cli.Serve.Bucket, cli.Serve.Path, logger, cli.Serve.CacheSize, cli.Serve.Cors, cli.Serve.PublicURL, cli.Serve.TileEtag)

if err != nil {
logger.Fatalf("Failed to create new server, %v", err)
Expand All @@ -144,8 +146,18 @@
for k, v := range headers {
w.Header().Set(k, v)
}
w.WriteHeader(statusCode)
w.Write(body)
if statusCode == 200 {
// handle if-match, if-none-match request headers based on response etag
http.ServeContent(
w, r,
"", // name used to infer content-type, but we've already set that
time.UnixMilli(0), // ignore setting last-modified time and handling if-modified-since headers
bytes.NewReader(body),
)
} else {
w.WriteHeader(statusCode)
w.Write(body)
}
logger.Printf("served %d %s in %s", statusCode, r.URL.Path, time.Since(start))
})

Expand Down
8 changes: 6 additions & 2 deletions pmtiles/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,17 @@
path string
}

func (b FileBucket) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) {

Check warning on line 75 in pmtiles/bucket.go

View workflow job for this annotation

GitHub Actions / fmt_vet_lint

exported method FileBucket.NewRangeReader should have comment or be unexported
body, _, err := b.NewRangeReaderEtag(ctx, key, offset, length, "")
return body, err
}

func generateEtag(data []byte) string {
hash := md5.Sum([]byte(data))
return fmt.Sprintf(`"%s"`, hex.EncodeToString(hash[:]))
}

func (b FileBucket) NewRangeReaderEtag(_ context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, error) {

Check warning on line 85 in pmtiles/bucket.go

View workflow job for this annotation

GitHub Actions / fmt_vet_lint

exported method FileBucket.NewRangeReaderEtag should have comment or be unexported
name := filepath.Join(b.path, key)
file, err := os.Open(name)
defer file.Close()
Expand All @@ -89,8 +94,7 @@
return nil, "", err
}
modInfo := fmt.Sprintf("%d %d", info.ModTime().UnixNano(), info.Size())
hash := md5.Sum([]byte(modInfo))
newEtag := fmt.Sprintf(`"%s"`, hex.EncodeToString(hash[:]))
newEtag := generateEtag([]byte(modInfo))
if len(etag) > 0 && etag != newEtag {
return nil, "", &RefreshRequiredError{}
}
Expand Down
53 changes: 38 additions & 15 deletions pmtiles/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,14 @@ type Server struct {
cacheSize int
cors string
publicURL string
tileEtag bool
}

var emptyData = make([]byte, 0)
var emptyEtag = generateEtag(emptyData)

// NewServer creates a new pmtiles HTTP server.
func NewServer(bucketURL string, prefix string, logger *log.Logger, cacheSize int, cors string, publicURL string) (*Server, error) {
func NewServer(bucketURL string, prefix string, logger *log.Logger, cacheSize int, cors string, publicURL string, tileEtag bool) (*Server, error) {

ctx := context.Background()

Expand All @@ -70,11 +74,11 @@ func NewServer(bucketURL string, prefix string, logger *log.Logger, cacheSize in
return nil, err
}

return NewServerWithBucket(bucket, prefix, logger, cacheSize, cors, publicURL)
return NewServerWithBucket(bucket, prefix, logger, cacheSize, cors, publicURL, tileEtag)
}

// NewServerWithBucket creates a new HTTP server for a gocloud Bucket.
func NewServerWithBucket(bucket Bucket, _ string, logger *log.Logger, cacheSize int, cors string, publicURL string) (*Server, error) {
func NewServerWithBucket(bucket Bucket, _ string, logger *log.Logger, cacheSize int, cors string, publicURL string, tileEtag bool) (*Server, error) {

reqs := make(chan request, 8)

Expand All @@ -85,6 +89,7 @@ func NewServerWithBucket(bucket Bucket, _ string, logger *log.Logger, cacheSize
cacheSize: cacheSize,
cors: cors,
publicURL: publicURL,
tileEtag: tileEtag,
}

return l, nil
Expand Down Expand Up @@ -229,30 +234,30 @@ func (server *Server) Start() {
}()
}

func (server *Server) getHeaderMetadata(ctx context.Context, name string) (bool, HeaderV3, []byte, error) {
found, header, metadataBytes, purgeEtag, err := server.getHeaderMetadataAttempt(ctx, name, "")
func (server *Server) getHeaderMetadata(ctx context.Context, name string) (bool, HeaderV3, []byte, string, error) {
found, header, metadataBytes, purgeEtag, newEtag, err := server.getHeaderMetadataAttempt(ctx, name, "")
if len(purgeEtag) > 0 {
found, header, metadataBytes, _, err = server.getHeaderMetadataAttempt(ctx, name, purgeEtag)
found, header, metadataBytes, _, newEtag, err = server.getHeaderMetadataAttempt(ctx, name, purgeEtag)
}
return found, header, metadataBytes, err
return found, header, metadataBytes, newEtag, err
}

func (server *Server) getHeaderMetadataAttempt(ctx context.Context, name, purgeEtag string) (bool, HeaderV3, []byte, string, error) {
func (server *Server) getHeaderMetadataAttempt(ctx context.Context, name, purgeEtag string) (bool, HeaderV3, []byte, string, string, error) {
rootReq := request{key: cacheKey{name: name, offset: 0, length: 0}, value: make(chan cachedValue, 1), purgeEtag: purgeEtag}
server.reqs <- rootReq
rootValue := <-rootReq.value
header := rootValue.header

if !rootValue.ok {
return false, HeaderV3{}, nil, "", nil
return false, HeaderV3{}, nil, "", rootValue.etag, nil
}

r, _, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.MetadataOffset), int64(header.MetadataLength), rootValue.etag)
if isRefreshRequredError(err) {
return false, HeaderV3{}, nil, rootValue.etag, nil
return false, HeaderV3{}, nil, rootValue.etag, rootValue.etag, nil
}
if err != nil {
return false, HeaderV3{}, nil, "", nil
return false, HeaderV3{}, nil, "", rootValue.etag, nil
}
defer r.Close()

Expand All @@ -264,14 +269,14 @@ func (server *Server) getHeaderMetadataAttempt(ctx context.Context, name, purgeE
} else if header.InternalCompression == NoCompression {
metadataBytes, err = io.ReadAll(r)
} else {
return true, HeaderV3{}, nil, "", errors.New("unknown compression")
return true, HeaderV3{}, nil, "", "", errors.New("unknown compression")
}

return true, header, metadataBytes, "", nil
return true, header, metadataBytes, "", rootValue.etag, nil
}

func (server *Server) getTileJSON(ctx context.Context, httpHeaders map[string]string, name string) (int, map[string]string, []byte) {
found, header, metadataBytes, err := server.getHeaderMetadata(ctx, name)
found, header, metadataBytes, etag, err := server.getHeaderMetadata(ctx, name)

if err != nil {
return 500, httpHeaders, []byte("I/O Error")
Expand All @@ -295,11 +300,17 @@ func (server *Server) getTileJSON(ctx context.Context, httpHeaders map[string]st

httpHeaders["Content-Type"] = "application/json"

if server.tileEtag {
httpHeaders["Etag"] = etag
} else {
httpHeaders["Etag"] = generateEtag(tilejsonBytes)
}

return 200, httpHeaders, tilejsonBytes
}

func (server *Server) getMetadata(ctx context.Context, httpHeaders map[string]string, name string) (int, map[string]string, []byte) {
found, _, metadataBytes, err := server.getHeaderMetadata(ctx, name)
found, _, metadataBytes, etag, err := server.getHeaderMetadata(ctx, name)

if err != nil {
return 500, httpHeaders, []byte("I/O Error")
Expand All @@ -310,6 +321,11 @@ func (server *Server) getMetadata(ctx context.Context, httpHeaders map[string]st
}

httpHeaders["Content-Type"] = "application/json"
if server.tileEtag {
httpHeaders["Etag"] = etag
} else {
httpHeaders["Etag"] = generateEtag(metadataBytes)
}
return 200, httpHeaders, metadataBytes
}
func (server *Server) getTile(ctx context.Context, httpHeaders map[string]string, name string, z uint8, x uint32, y uint32, ext string) (int, map[string]string, []byte) {
Expand All @@ -320,6 +336,7 @@ func (server *Server) getTile(ctx context.Context, httpHeaders map[string]string
}
return status, headers, data
}

func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string]string, name string, z uint8, x uint32, y uint32, ext string, purgeEtag string) (int, map[string]string, []byte, string) {
rootReq := request{key: cacheKey{name: name, offset: 0, length: 0}, value: make(chan cachedValue, 1), purgeEtag: purgeEtag}
bdon marked this conversation as resolved.
Show resolved Hide resolved
server.reqs <- rootReq
Expand Down Expand Up @@ -390,6 +407,12 @@ func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string
if err != nil {
return 500, httpHeaders, []byte("I/O error"), ""
}

if server.tileEtag {
httpHeaders["Etag"] = generateEtag(b)
} else {
httpHeaders["Etag"] = rootValue.etag
}
if headerVal, ok := headerContentType(header); ok {
httpHeaders["Content-Type"] = headerVal
}
Expand Down
87 changes: 86 additions & 1 deletion pmtiles/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,12 @@ func fakeArchive(t *testing.T, header HeaderV3, metadata map[string]interface{},
}

func newServer(t *testing.T) (mockBucket, *Server) {
return newServerWithTileEtags(t, false)
}

func newServerWithTileEtags(t *testing.T, tileEtags bool) (mockBucket, *Server) {
bucket := mockBucket{make(map[string][]byte)}
server, err := NewServerWithBucket(bucket, "", log.Default(), 10, "", "tiles.example.com")
server, err := NewServerWithBucket(bucket, "", log.Default(), 10, "", "tiles.example.com", tileEtags)
assert.Nil(t, err)
server.Start()
return bucket, server
Expand Down Expand Up @@ -370,3 +374,84 @@ func TestInvalidateCacheOnMetadataRequest(t *testing.T) {
"meta": "data2"
}`, string(data))
}

func TestEtagResponsesFromArchive(t *testing.T) {
mockBucket, server := newServerWithTileEtags(t, false)
header := HeaderV3{
TileType: Mvt,
}
mockBucket.items["archive.pmtiles"] = fakeArchive(t, header, map[string]interface{}{}, map[Zxy][]byte{
{0, 0, 0}: {0, 1, 2, 3},
{4, 1, 2}: {1, 2, 3},
}, false)

statusCode, headers000v1, _ := server.Get(context.Background(), "/archive/0/0/0.mvt")
assert.Equal(t, 200, statusCode)
statusCode, headers412v1, _ := server.Get(context.Background(), "/archive/4/1/2.mvt")
assert.Equal(t, 200, statusCode)
statusCode, headers311v1, _ := server.Get(context.Background(), "/archive/3/1/1.mvt")
assert.Equal(t, 204, statusCode)

mockBucket.items["archive.pmtiles"] = fakeArchive(t, header, map[string]interface{}{}, map[Zxy][]byte{
{0, 0, 0}: {0, 1, 2, 3},
{4, 1, 2}: {1, 2, 3, 4}, // different
}, false)

statusCode, headers000v2, _ := server.Get(context.Background(), "/archive/0/0/0.mvt")
assert.Equal(t, 200, statusCode)
statusCode, headers412v2, _ := server.Get(context.Background(), "/archive/4/1/2.mvt")
assert.Equal(t, 200, statusCode)
statusCode, headers311v2, _ := server.Get(context.Background(), "/archive/3/1/1.mvt")
assert.Equal(t, 204, statusCode)

assert.Equal(t, headers000v1["Etag"], headers412v1["Etag"])
assert.NotEqual(t, headers000v1["Etag"], headers000v2["Etag"])
assert.Equal(t, headers000v2["Etag"], headers412v2["Etag"])

assert.Equal(t, "", headers311v1["Etag"])
assert.Equal(t, "", headers311v2["Etag"])
}

func TestEtagResponsesFromTile(t *testing.T) {
mockBucket, server := newServerWithTileEtags(t, true)
header := HeaderV3{
TileType: Mvt,
}
mockBucket.items["archive.pmtiles"] = fakeArchive(t, header, map[string]interface{}{}, map[Zxy][]byte{
{0, 0, 0}: {0, 1, 2, 3},
{4, 1, 2}: {1, 2, 3},
}, false)

statusCode, headers000v1, _ := server.Get(context.Background(), "/archive/0/0/0.mvt")
assert.Equal(t, 200, statusCode)
statusCode, headers412v1, _ := server.Get(context.Background(), "/archive/4/1/2.mvt")
assert.Equal(t, 200, statusCode)
statusCode, headers311v1, _ := server.Get(context.Background(), "/archive/3/1/1.mvt")
assert.Equal(t, 204, statusCode)

mockBucket.items["archive.pmtiles"] = fakeArchive(t, header, map[string]interface{}{}, map[Zxy][]byte{
{0, 0, 0}: {0, 1, 2, 3},
{4, 1, 2}: {1, 2, 3, 4}, // different
}, false)

statusCode, headers000v2, _ := server.Get(context.Background(), "/archive/0/0/0.mvt")
assert.Equal(t, 200, statusCode)
statusCode, headers412v2, _ := server.Get(context.Background(), "/archive/4/1/2.mvt")
assert.Equal(t, 200, statusCode)
statusCode, headers311v2, _ := server.Get(context.Background(), "/archive/3/1/1.mvt")
assert.Equal(t, 204, statusCode)

// 204's have no etag
assert.Equal(t, "", headers311v1["Etag"])
assert.Equal(t, "", headers311v2["Etag"])

// 000 and 311 didn't change
assert.Equal(t, headers000v1["Etag"], headers000v2["Etag"])

// 412 did change
assert.NotEqual(t, headers412v1["Etag"], headers412v2["Etag"])

// all are different
assert.NotEqual(t, headers000v1["Etag"], headers311v1["Etag"])
assert.NotEqual(t, headers000v1["Etag"], headers412v1["Etag"])
}
Loading