Skip to content

Commit

Permalink
Implement native file bucket with change detection (#132)
Browse files Browse the repository at this point in the history
Replace gocloud file:// protocol bucket with native go file access, etag based on modtime
  • Loading branch information
msbarry authored Feb 9, 2024
1 parent 42c3285 commit d136ff4
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 3 deletions.
9 changes: 6 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
name: test

on: [push]
on:
pull_request:
branches:
- "main"

jobs:
test:
Expand All @@ -12,15 +15,15 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
with:
go-version: '^1.21.0'
go-version: "^1.21.0"
- run: go test ./pmtiles
fmt_vet_lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
with:
go-version: '^1.21.0'
go-version: "^1.21.0"
- run: if [ "$(gofmt -s -l . | wc -l)" -gt 0 ]; then exit 1; fi
- run: go vet caddy/pmtiles_proxy.go
- run: go vet main.go
Expand Down
51 changes: 51 additions & 0 deletions pmtiles/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,48 @@ func (m mockBucket) NewRangeReaderEtag(_ context.Context, key string, offset int
return io.NopCloser(bytes.NewReader(bs[offset:(offset + length)])), resultEtag, nil
}

// FileBucket is a bucket backed by a directory on disk
type FileBucket struct {
path string
}

func (b FileBucket) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) {
body, _, err := b.NewRangeReaderEtag(ctx, key, offset, length, "")
return body, err
}

func (b FileBucket) NewRangeReaderEtag(_ context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, error) {
name := filepath.Join(b.path, key)
file, err := os.Open(name)
defer file.Close()
if err != nil {
return nil, "", err
}
info, err := file.Stat()
if err != nil {
return nil, "", err
}
modInfo := fmt.Sprintf("%d %d", info.ModTime().UnixNano(), info.Size())
hash := md5.Sum([]byte(modInfo))
newEtag := fmt.Sprintf(`"%s"`, hex.EncodeToString(hash[:]))
if len(etag) > 0 && etag != newEtag {
return nil, "", &RefreshRequiredError{}
}
result := make([]byte, length)
read, err := file.ReadAt(result, offset)
if err != nil {
return nil, "", err
}
if read != int(length) {
return nil, "", fmt.Errorf("Expected to read %d bytes but only read %d", length, read)
}
return io.NopCloser(bytes.NewReader(result)), newEtag, nil
}

func (b FileBucket) Close() error {
return nil
}

// HTTPClient is an interface that lets you swap out the default client with a mock one in tests
type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
Expand Down Expand Up @@ -198,6 +240,15 @@ func OpenBucket(ctx context.Context, bucketURL string, bucketPrefix string) (Buc
bucket := HTTPBucket{bucketURL, http.DefaultClient}
return bucket, nil
}
if strings.HasPrefix(bucketURL, "file") {
fileprotocol := "file://"
if string(os.PathSeparator) != "/" {
fileprotocol += "/"
}
path := strings.Replace(bucketURL, fileprotocol, "", 1)
bucket := FileBucket{filepath.FromSlash(path)}
return bucket, nil
}
bucket, err := blob.OpenBucket(ctx, bucketURL)
if err != nil {
return nil, err
Expand Down
69 changes: 69 additions & 0 deletions pmtiles/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package pmtiles

import (
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"testing"

"github.com/stretchr/testify/assert"
_ "gocloud.dev/blob/fileblob"
)

func TestNormalizeLocalFile(t *testing.T) {
Expand Down Expand Up @@ -114,3 +117,69 @@ func TestHttpBucketRequestRequestEtagFailed(t *testing.T) {
_, _, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
assert.False(t, isRefreshRequredError(err))
}

func TestFileBucketReplace(t *testing.T) {
tmp := t.TempDir()
bucketURL, _, err := NormalizeBucketKey("", tmp, "")
assert.Nil(t, err)
fmt.Println(bucketURL)
bucket, err := OpenBucket(context.Background(), bucketURL, "")
assert.Nil(t, err)
assert.NotNil(t, bucket)
assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive.pmtiles"), []byte{1, 2, 3}, 0666))

// first read from file
reader, etag1, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
assert.Nil(t, err)
data, err := io.ReadAll(reader)
assert.Nil(t, err)
assert.Equal(t, []byte{2}, data)

// change file, verify etag changes
assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive.pmtiles"), []byte{4, 5, 6, 7}, 0666))
reader, etag2, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
assert.Nil(t, err)
data, err = io.ReadAll(reader)
assert.Nil(t, err)
assert.NotEqual(t, etag1, etag2)
assert.Equal(t, []byte{5}, data)

// and requesting with old etag fails with refresh required error
_, _, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1)
assert.True(t, isRefreshRequredError(err))
}

func TestFileBucketRename(t *testing.T) {
tmp := t.TempDir()
assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive.pmtiles"), []byte{1, 2, 3}, 0666))
assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive2.pmtiles"), []byte{4, 5, 6, 7}, 0666))

bucketURL, _, err := NormalizeBucketKey("", tmp, "")
assert.Nil(t, err)
fmt.Println(bucketURL)
bucket, err := OpenBucket(context.Background(), bucketURL, "")
assert.Nil(t, err)
assert.NotNil(t, bucket)
assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive.pmtiles"), []byte{1, 2, 3}, 0666))

// first read from file
reader, etag1, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
assert.Nil(t, err)
data, err := io.ReadAll(reader)
assert.Nil(t, err)
assert.Equal(t, []byte{2}, data)

// change file, verify etag changes
os.Rename(filepath.Join(tmp, "archive.pmtiles"), filepath.Join(tmp, "archive3.pmtiles"))
os.Rename(filepath.Join(tmp, "archive2.pmtiles"), filepath.Join(tmp, "archive.pmtiles"))
reader, etag2, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "")
assert.Nil(t, err)
data, err = io.ReadAll(reader)
assert.Nil(t, err)
assert.NotEqual(t, etag1, etag2)
assert.Equal(t, []byte{5}, data)

// and requesting with old etag fails with refresh required error
_, _, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1)
assert.True(t, isRefreshRequredError(err))
}

0 comments on commit d136ff4

Please sign in to comment.