Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 16k file bucket #143

Merged
merged 4 commits into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions pmtiles/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,15 @@
if len(etag) > 0 && resultEtag != etag {
return nil, "", 412, &RefreshRequiredError{}
}
if offset+length > int64(len(bs)) {
if offset > int64(len(bs)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be >=

return nil, "", 416, &RefreshRequiredError{416}
}

return io.NopCloser(bytes.NewReader(bs[offset:(offset + length)])), resultEtag, 206, nil
end := offset + length
if end > int64(len(bs)) {
end = int64(len(bs))
}
return io.NopCloser(bytes.NewReader(bs[offset:end])), resultEtag, 206, nil
}

// FileBucket is a bucket backed by a directory on disk
Expand All @@ -77,7 +81,7 @@
return &FileBucket{path: path}
}

func (b FileBucket) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) {

Check warning on line 84 in pmtiles/bucket.go

View workflow job for this annotation

GitHub Actions / fmt_vet_lint

exported method FileBucket.NewRangeReader should have comment or be unexported
body, _, _, err := b.NewRangeReaderEtag(ctx, key, offset, length, "")
return body, err
}
Expand Down Expand Up @@ -107,7 +111,7 @@
return hasherToEtag(hasher)
}

func (b FileBucket) NewRangeReaderEtag(_ context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, int, error) {

Check warning on line 114 in pmtiles/bucket.go

View workflow job for this annotation

GitHub Actions / fmt_vet_lint

exported method FileBucket.NewRangeReaderEtag should have comment or be unexported
name := filepath.Join(b.path, key)
file, err := os.Open(name)
defer file.Close()
Expand All @@ -124,12 +128,19 @@
}
result := make([]byte, length)
read, err := file.ReadAt(result, offset)

if err == io.EOF {
part := result[0:read]
return io.NopCloser(bytes.NewReader(part)), newEtag, 206, nil
}

if err != nil {
return nil, "", 500, err
}
if read != int(length) {
return nil, "", 416, fmt.Errorf("Expected to read %d bytes but only read %d", length, read)
}

return io.NopCloser(bytes.NewReader(result)), newEtag, 206, nil
}

Expand Down Expand Up @@ -172,7 +183,7 @@

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
resp.Body.Close()
if isRefreshRequredCode(resp.StatusCode) {
if isRefreshRequiredCode(resp.StatusCode) {
err = &RefreshRequiredError{resp.StatusCode}
} else {
err = fmt.Errorf("HTTP error: %d", resp.StatusCode)
Expand All @@ -187,7 +198,7 @@
return nil
}

func isRefreshRequredCode(code int) bool {
func isRefreshRequiredCode(code int) bool {
return code == http.StatusPreconditionFailed || code == http.StatusRequestedRangeNotSatisfiable
}

Expand Down Expand Up @@ -217,7 +228,7 @@
status = 404
if resp != nil {
status = resp.StatusCode()
if isRefreshRequredCode(resp.StatusCode()) {
if isRefreshRequiredCode(resp.StatusCode()) {
return nil, "", resp.StatusCode(), &RefreshRequiredError{resp.StatusCode()}
}
}
Expand Down
28 changes: 20 additions & 8 deletions pmtiles/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pmtiles

import (
"context"
"fmt"
"io"
"net/http"
"os"
Expand Down Expand Up @@ -110,24 +109,23 @@ func TestHttpBucketRequestRequestEtagFailed(t *testing.T) {
_, _, status, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
assert.Equal(t, "etag1", mock.request.Header.Get("If-Match"))
assert.Equal(t, 412, status)
assert.True(t, isRefreshRequredError(err))
assert.True(t, isRefreshRequiredError(err))

mock.response.StatusCode = 416
_, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
assert.Equal(t, 416, status)
assert.True(t, isRefreshRequredError(err))
assert.True(t, isRefreshRequiredError(err))

mock.response.StatusCode = 404
_, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
assert.False(t, isRefreshRequredError(err))
assert.False(t, isRefreshRequiredError(err))
assert.Equal(t, 404, status)
}

func TestFileBucketReplace(t *testing.T) {
tmp := t.TempDir()
bucketURL, _, err := NormalizeBucketKey("", tmp, "")
assert.Nil(t, err)
fmt.Println(bucketURL)
bucket, err := OpenBucket(context.Background(), bucketURL, "")
assert.Nil(t, err)
assert.NotNil(t, bucket)
Expand All @@ -154,7 +152,7 @@ func TestFileBucketReplace(t *testing.T) {
// and requesting with old etag fails with refresh required error
_, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1)
assert.Equal(t, 412, status)
assert.True(t, isRefreshRequredError(err))
assert.True(t, isRefreshRequiredError(err))
}

func TestFileBucketRename(t *testing.T) {
Expand All @@ -164,7 +162,6 @@ func TestFileBucketRename(t *testing.T) {

bucketURL, _, err := NormalizeBucketKey("", tmp, "")
assert.Nil(t, err)
fmt.Println(bucketURL)
bucket, err := OpenBucket(context.Background(), bucketURL, "")
assert.Nil(t, err)
assert.NotNil(t, bucket)
Expand Down Expand Up @@ -192,5 +189,20 @@ func TestFileBucketRename(t *testing.T) {
// and requesting with old etag fails with refresh required error
_, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1)
assert.Equal(t, 412, status)
assert.True(t, isRefreshRequredError(err))
assert.True(t, isRefreshRequiredError(err))
}

func TestFileShorterThan16K(t *testing.T) {
tmp := t.TempDir()
assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive.pmtiles"), []byte{1, 2, 3}, 0666))

bucketURL, _, err := NormalizeBucketKey("", tmp, "")
bucket, err := OpenBucket(context.Background(), bucketURL, "")

reader, _, status, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 0, 16384, "")
assert.Equal(t, 206, status)
assert.Nil(t, err)
data, err := io.ReadAll(reader)
assert.Nil(t, err)
assert.Equal(t, 3, len(data))
}
8 changes: 4 additions & 4 deletions pmtiles/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (server *Server) Start() {

if err != nil {
ok = false
result.badEtag = isRefreshRequredError(err)
result.badEtag = isRefreshRequiredError(err)
resps <- response{key: key, value: result}
server.logger.Printf("failed to fetch %s %d-%d, %v", key.name, key.offset, key.length, err)
return
Expand Down Expand Up @@ -256,7 +256,7 @@ func (server *Server) getHeaderMetadataAttempt(ctx context.Context, name, purgeE
defer func() { tracker.finish(ctx, status) }()
r, _, statusCode, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.MetadataOffset), int64(header.MetadataLength), rootValue.etag)
status = strconv.Itoa(statusCode)
if isRefreshRequredError(err) {
if isRefreshRequiredError(err) {
return false, HeaderV3{}, nil, rootValue.etag, nil
}
if err != nil {
Expand Down Expand Up @@ -393,7 +393,7 @@ func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string
defer func() { tracker.finish(ctx, status) }()
r, _, statusCode, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.TileDataOffset+entry.Offset), int64(entry.Length), rootValue.etag)
status = strconv.Itoa(statusCode)
if isRefreshRequredError(err) {
if isRefreshRequiredError(err) {
return 500, httpHeaders, []byte("I/O Error"), rootValue.etag
}
// possible we have the header/directory cached but the archive has disappeared
Expand Down Expand Up @@ -429,7 +429,7 @@ func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string
return 204, httpHeaders, nil, ""
}

func isRefreshRequredError(err error) bool {
func isRefreshRequiredError(err error) bool {
_, ok := err.(*RefreshRequiredError)
return ok
}
Expand Down
3 changes: 0 additions & 3 deletions pmtiles/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,6 @@ func fakeArchive(t *testing.T, header HeaderV3, metadata map[string]interface{},
archiveBytes = append(archiveBytes, metadataBytes...)
archiveBytes = append(archiveBytes, leavesBytes...)
archiveBytes = append(archiveBytes, tileDataBytes...)
if len(archiveBytes) < 16384 {
archiveBytes = append(archiveBytes, make([]byte, 16384-len(archiveBytes))...)
}
return archiveBytes
}

Expand Down
Loading