Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more prometheus metrics #141

Merged
merged 10 commits into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main

Check warning on line 1 in main.go

View workflow job for this annotation

GitHub Actions / fmt_vet_lint

should have a package comment

import (
"fmt"
Expand Down Expand Up @@ -136,6 +136,7 @@
logger.Fatalf("Failed to create new server, %v", err)
}

pmtiles.SetBuildInfo(version, commit, date)
server.Start()

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
Expand Down
59 changes: 32 additions & 27 deletions pmtiles/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
type Bucket interface {
Close() error
NewRangeReader(ctx context.Context, key string, offset int64, length int64) (io.ReadCloser, error)
NewRangeReaderEtag(ctx context.Context, key string, offset int64, length int64, etag string) (io.ReadCloser, string, error)
NewRangeReaderEtag(ctx context.Context, key string, offset int64, length int64, etag string) (io.ReadCloser, string, int, error)
}

// RefreshRequiredError is an error that indicates the etag has chanced on the remote file
Expand All @@ -46,25 +46,25 @@
}

func (m mockBucket) NewRangeReader(ctx context.Context, key string, offset int64, length int64) (io.ReadCloser, error) {
body, _, err := m.NewRangeReaderEtag(ctx, key, offset, length, "")
body, _, _, err := m.NewRangeReaderEtag(ctx, key, offset, length, "")
return body, err

}
func (m mockBucket) NewRangeReaderEtag(_ context.Context, key string, offset int64, length int64, etag string) (io.ReadCloser, string, error) {
func (m mockBucket) NewRangeReaderEtag(_ context.Context, key string, offset int64, length int64, etag string) (io.ReadCloser, string, int, error) {
bs, ok := m.items[key]
if !ok {
return nil, "", fmt.Errorf("Not found %s", key)
return nil, "", 404, fmt.Errorf("Not found %s", key)
}

resultEtag := generateEtag(bs)
if len(etag) > 0 && resultEtag != etag {
return nil, "", &RefreshRequiredError{}
return nil, "", 412, &RefreshRequiredError{}
}
if offset+length > int64(len(bs)) {
return nil, "", &RefreshRequiredError{416}
return nil, "", 416, &RefreshRequiredError{416}
}

return io.NopCloser(bytes.NewReader(bs[offset:(offset + length)])), resultEtag, nil
return io.NopCloser(bytes.NewReader(bs[offset:(offset + length)])), resultEtag, 206, nil
}

// FileBucket is a bucket backed by a directory on disk
Expand All @@ -72,8 +72,8 @@
path string
}

func (b FileBucket) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) {

Check warning on line 75 in pmtiles/bucket.go

View workflow job for this annotation

GitHub Actions / fmt_vet_lint

exported method FileBucket.NewRangeReader should have comment or be unexported
body, _, err := b.NewRangeReaderEtag(ctx, key, offset, length, "")
body, _, _, err := b.NewRangeReaderEtag(ctx, key, offset, length, "")
return body, err
}

Expand Down Expand Up @@ -102,33 +102,33 @@
return hasherToEtag(hasher)
}

func (b FileBucket) NewRangeReaderEtag(_ context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, error) {
func (b FileBucket) NewRangeReaderEtag(_ context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, int, error) {
name := filepath.Join(b.path, key)
file, err := os.Open(name)
defer file.Close()
if err != nil {
return nil, "", err
return nil, "", 404, err
}
info, err := file.Stat()
if err != nil {
return nil, "", err
return nil, "", 404, err
}
newEtag := generateEtagFromInts(info.ModTime().UnixNano(), info.Size())
if len(etag) > 0 && etag != newEtag {
return nil, "", &RefreshRequiredError{}
return nil, "", 412, &RefreshRequiredError{}
}
result := make([]byte, length)
read, err := file.ReadAt(result, offset)
if err != nil {
return nil, "", err
return nil, "", 500, err
}
if read != int(length) {
return nil, "", fmt.Errorf("Expected to read %d bytes but only read %d", length, read)
return nil, "", 416, fmt.Errorf("Expected to read %d bytes but only read %d", length, read)
}
return io.NopCloser(bytes.NewReader(result)), newEtag, nil
return io.NopCloser(bytes.NewReader(result)), newEtag, 206, nil
}

func (b FileBucket) Close() error {

Check warning on line 131 in pmtiles/bucket.go

View workflow job for this annotation

GitHub Actions / fmt_vet_lint

exported method FileBucket.Close should have comment or be unexported
return nil
}

Expand All @@ -143,16 +143,16 @@
}

func (b HTTPBucket) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) {
body, _, err := b.NewRangeReaderEtag(ctx, key, offset, length, "")
body, _, _, err := b.NewRangeReaderEtag(ctx, key, offset, length, "")
return body, err
}

func (b HTTPBucket) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, error) {
func (b HTTPBucket) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, int, error) {
reqURL := b.baseURL + "/" + key

req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
if err != nil {
return nil, "", err
return nil, "", 500, err
}

req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))
Expand All @@ -162,7 +162,7 @@

resp, err := b.client.Do(req)
if err != nil {
return nil, "", err
return nil, "", resp.StatusCode, err
}

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
Expand All @@ -172,10 +172,10 @@
} else {
err = fmt.Errorf("HTTP error: %d", resp.StatusCode)
}
return nil, "", err
return nil, "", resp.StatusCode, err
}

return resp.Body, resp.Header.Get("ETag"), nil
return resp.Body, resp.Header.Get("ETag"), resp.StatusCode, nil
}

func (b HTTPBucket) Close() error {
Expand All @@ -191,11 +191,11 @@
}

func (ba BucketAdapter) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) {
body, _, err := ba.NewRangeReaderEtag(ctx, key, offset, length, "")
body, _, _, err := ba.NewRangeReaderEtag(ctx, key, offset, length, "")
return body, err
}

func (ba BucketAdapter) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, error) {
func (ba BucketAdapter) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, int, error) {
reader, err := ba.Bucket.NewRangeReader(ctx, key, offset, length, &blob.ReaderOptions{
BeforeRead: func(asFunc func(interface{}) bool) error {
var req *s3.GetObjectInput
Expand All @@ -205,20 +205,25 @@
return nil
},
})
status := 206
if err != nil {
var resp awserr.RequestFailure
errors.As(err, &resp)
if resp != nil && isRefreshRequredCode(resp.StatusCode()) {
return nil, "", &RefreshRequiredError{resp.StatusCode()}
status = 404
if resp != nil {
status = resp.StatusCode()
if isRefreshRequredCode(resp.StatusCode()) {
return nil, "", resp.StatusCode(), &RefreshRequiredError{resp.StatusCode()}
}
}
return nil, "", err
return nil, "", status, err
}
resultETag := ""
var resp s3.GetObjectOutput
if reader.As(&resp) {
resultETag = *resp.ETag
}
return reader, resultETag, nil
return reader, resultETag, status, nil
}

func (ba BucketAdapter) Close() error {
Expand Down
33 changes: 22 additions & 11 deletions pmtiles/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ func TestHttpBucketRequestNormal(t *testing.T) {
Body: io.NopCloser(strings.NewReader("abc")),
Header: header,
}
data, etag, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 100, 3, "")
data, etag, status, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 100, 3, "")
assert.Equal(t, "", mock.request.Header.Get("If-Match"))
assert.Equal(t, "bytes=100-102", mock.request.Header.Get("Range"))
assert.Equal(t, "http://tiles.example.com/tiles/a/b/c", mock.request.URL.String())
assert.Equal(t, 200, status)
assert.Nil(t, err)
b, err := io.ReadAll(data)
assert.Nil(t, err)
Expand All @@ -85,8 +86,9 @@ func TestHttpBucketRequestRequestEtag(t *testing.T) {
Body: io.NopCloser(strings.NewReader("abc")),
Header: header,
}
data, etag, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
data, etag, status, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
assert.Equal(t, "etag1", mock.request.Header.Get("If-Match"))
assert.Equal(t, 200, status)
assert.Nil(t, err)
b, err := io.ReadAll(data)
assert.Nil(t, err)
Expand All @@ -105,17 +107,20 @@ func TestHttpBucketRequestRequestEtagFailed(t *testing.T) {
Body: io.NopCloser(strings.NewReader("abc")),
Header: header,
}
_, _, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
_, _, status, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
assert.Equal(t, "etag1", mock.request.Header.Get("If-Match"))
assert.Equal(t, 412, status)
assert.True(t, isRefreshRequredError(err))

mock.response.StatusCode = 416
_, _, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
_, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
assert.Equal(t, 416, status)
assert.True(t, isRefreshRequredError(err))

mock.response.StatusCode = 404
_, _, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
_, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
assert.False(t, isRefreshRequredError(err))
assert.Equal(t, 404, status)
}

func TestFileBucketReplace(t *testing.T) {
Expand All @@ -129,23 +134,26 @@ func TestFileBucketReplace(t *testing.T) {
assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive.pmtiles"), []byte{1, 2, 3}, 0666))

// first read from file
reader, etag1, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
reader, etag1, status, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
assert.Equal(t, 206, status)
assert.Nil(t, err)
data, err := io.ReadAll(reader)
assert.Nil(t, err)
assert.Equal(t, []byte{2}, data)

// change file, verify etag changes
assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive.pmtiles"), []byte{4, 5, 6, 7}, 0666))
reader, etag2, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
reader, etag2, status, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
assert.Equal(t, 206, status)
assert.Nil(t, err)
data, err = io.ReadAll(reader)
assert.Nil(t, err)
assert.NotEqual(t, etag1, etag2)
assert.Equal(t, []byte{5}, data)

// and requesting with old etag fails with refresh required error
_, _, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1)
_, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1)
assert.Equal(t, 412, status)
assert.True(t, isRefreshRequredError(err))
}

Expand All @@ -163,7 +171,8 @@ func TestFileBucketRename(t *testing.T) {
assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive.pmtiles"), []byte{1, 2, 3}, 0666))

// first read from file
reader, etag1, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
reader, etag1, status, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
assert.Equal(t, 206, status)
assert.Nil(t, err)
data, err := io.ReadAll(reader)
assert.Nil(t, err)
Expand All @@ -172,14 +181,16 @@ func TestFileBucketRename(t *testing.T) {
// change file, verify etag changes
os.Rename(filepath.Join(tmp, "archive.pmtiles"), filepath.Join(tmp, "archive3.pmtiles"))
os.Rename(filepath.Join(tmp, "archive2.pmtiles"), filepath.Join(tmp, "archive.pmtiles"))
reader, etag2, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
reader, etag2, status, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
assert.Equal(t, 206, status)
assert.Nil(t, err)
data, err = io.ReadAll(reader)
assert.Nil(t, err)
assert.NotEqual(t, etag1, etag2)
assert.Equal(t, []byte{5}, data)

// and requesting with old etag fails with refresh required error
_, _, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1)
_, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1)
assert.Equal(t, 412, status)
assert.True(t, isRefreshRequredError(err))
}
Loading
Loading