Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement native file bucket with change detection #132

Merged
merged 11 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions pmtiles/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,49 @@ func (m mockBucket) NewRangeReaderEtag(_ context.Context, key string, offset int
return io.NopCloser(bytes.NewReader(bs[offset:(offset + length)])), resultEtag, nil
}

// FileBucket is a bucket backed by a directory on disk
type FileBucket struct {
path string
file *os.File
}

func (b FileBucket) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) {
body, _, err := b.NewRangeReaderEtag(ctx, key, offset, length, "")
return body, err
}

func (b FileBucket) NewRangeReaderEtag(ctx context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, error) {
name := b.path + string(os.PathSeparator) + key
msbarry marked this conversation as resolved.
Show resolved Hide resolved
file, err := os.Open(name)
defer file.Close()
if err != nil {
return nil, "", err
}
info, err := file.Stat()
if err != nil {
return nil, "", err
}
modInfo := fmt.Sprintf("%d %d", info.ModTime().UnixNano(), info.Size())
hash := md5.Sum([]byte(modInfo))
newEtag := fmt.Sprintf(`"%s"`, hex.EncodeToString(hash[:]))
if len(etag) > 0 && etag != newEtag {
return nil, "", &RefreshRequiredError{}
}
result := make([]byte, length)
read, err := file.ReadAt(result, offset)
if err != nil {
return nil, "", err
}
if read != int(length) {
return nil, "", fmt.Errorf("Expected to read %d bytes but only read %d", length, read)
}
return io.NopCloser(bytes.NewReader(result)), newEtag, nil
}

func (b FileBucket) Close() error {
return nil
}

// HTTPClient is an interface that lets you swap out the default client with a mock one in tests
type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
Expand Down Expand Up @@ -198,6 +241,14 @@ func OpenBucket(ctx context.Context, bucketURL string, bucketPrefix string) (Buc
bucket := HTTPBucket{bucketURL, http.DefaultClient}
return bucket, nil
}
if strings.HasPrefix(bucketURL, "file") {
url, err := url.ParseRequestURI(bucketURL)
if err != nil {
return nil, err
}
bucket := FileBucket{url.Path, nil}
return bucket, nil
}
bucket, err := blob.OpenBucket(ctx, bucketURL)
if err != nil {
return nil, err
Expand Down
68 changes: 68 additions & 0 deletions pmtiles/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package pmtiles

import (
"context"
"fmt"
"io"
"net/http"
"os"
"strings"
"testing"

"github.com/stretchr/testify/assert"
_ "gocloud.dev/blob/fileblob"
)

func TestNormalizeLocalFile(t *testing.T) {
Expand Down Expand Up @@ -114,3 +116,69 @@ func TestHttpBucketRequestRequestEtagFailed(t *testing.T) {
_, _, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
assert.False(t, isRefreshRequredError(err))
}

func TestFileBucketReplace(t *testing.T) {
dir := t.TempDir()
bucketURL, _, err := NormalizeBucketKey("", dir, "")
assert.Nil(t, err)
fmt.Println(bucketURL)
bucket, err := OpenBucket(context.Background(), bucketURL, "")
assert.Nil(t, err)
assert.NotNil(t, bucket)
assert.Nil(t, os.WriteFile(dir+"/archive.pmtiles", []byte{1, 2, 3}, 0666))

// first read from file
reader, etag1, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
assert.Nil(t, err)
data, err := io.ReadAll(reader)
assert.Nil(t, err)
assert.Equal(t, []byte{2}, data)

// change file, verify etag changes
assert.Nil(t, os.WriteFile(dir+"/archive.pmtiles", []byte{4, 5, 6}, 0666))
reader, etag2, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
assert.Nil(t, err)
data, err = io.ReadAll(reader)
assert.Nil(t, err)
assert.NotEqual(t, etag1, etag2)
assert.Equal(t, []byte{5}, data)

// and requesting with old etag fails with refresh required error
_, _, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1)
assert.True(t, isRefreshRequredError(err))
}

func TestFileBucketRename(t *testing.T) {
dir := t.TempDir()
assert.Nil(t, os.WriteFile(dir+"/archive.pmtiles", []byte{1, 2, 3}, 0666))
assert.Nil(t, os.WriteFile(dir+"/archive2.pmtiles", []byte{4, 5, 6}, 0666))

bucketURL, _, err := NormalizeBucketKey("", dir, "")
assert.Nil(t, err)
fmt.Println(bucketURL)
bucket, err := OpenBucket(context.Background(), bucketURL, "")
assert.Nil(t, err)
assert.NotNil(t, bucket)
assert.Nil(t, os.WriteFile(dir+"/archive.pmtiles", []byte{1, 2, 3}, 0666))

// first read from file
reader, etag1, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
assert.Nil(t, err)
data, err := io.ReadAll(reader)
assert.Nil(t, err)
assert.Equal(t, []byte{2}, data)

// change file, verify etag changes
os.Rename(dir+"/archive.pmtiles", dir+"/archive3.pmtiles")
os.Rename(dir+"/archive2.pmtiles", dir+"/archive.pmtiles")
reader, etag2, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
assert.Nil(t, err)
data, err = io.ReadAll(reader)
assert.Nil(t, err)
assert.NotEqual(t, etag1, etag2)
assert.Equal(t, []byte{5}, data)

// and requesting with old etag fails with refresh required error
_, _, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1)
assert.True(t, isRefreshRequredError(err))
}
Loading