From d136ff48e1816ec4d3b9c05d87a1faef765e49c2 Mon Sep 17 00:00:00 2001 From: Michael Barry Date: Fri, 9 Feb 2024 07:27:07 -0500 Subject: [PATCH] Implement native file bucket with change detection (#132) Replace gocloud file:// protocol bucket with native go file access, etag based on modtime --- .github/workflows/test.yml | 9 +++-- pmtiles/bucket.go | 51 ++++++++++++++++++++++++++++ pmtiles/bucket_test.go | 69 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 126 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9e560c0..f34ab29 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -1,6 +1,9 @@ name: test -on: [push] +on: + pull_request: + branches: + - "main" jobs: test: @@ -12,7 +15,7 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-go@v3 with: - go-version: '^1.21.0' + go-version: "^1.21.0" - run: go test ./pmtiles fmt_vet_lint: runs-on: ubuntu-latest @@ -20,7 +23,7 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-go@v3 with: - go-version: '^1.21.0' + go-version: "^1.21.0" - run: if [ "$(gofmt -s -l . | wc -l)" -gt 0 ]; then exit 1; fi - run: go vet caddy/pmtiles_proxy.go - run: go vet main.go diff --git a/pmtiles/bucket.go b/pmtiles/bucket.go index 979730d..a8cc269 100644 --- a/pmtiles/bucket.go +++ b/pmtiles/bucket.go @@ -67,6 +67,48 @@ func (m mockBucket) NewRangeReaderEtag(_ context.Context, key string, offset int return io.NopCloser(bytes.NewReader(bs[offset:(offset + length)])), resultEtag, nil } +// FileBucket is a bucket backed by a directory on disk +type FileBucket struct { + path string +} + +func (b FileBucket) NewRangeReader(ctx context.Context, key string, offset, length int64) (io.ReadCloser, error) { + body, _, err := b.NewRangeReaderEtag(ctx, key, offset, length, "") + return body, err +} + +func (b FileBucket) NewRangeReaderEtag(_ context.Context, key string, offset, length int64, etag string) (io.ReadCloser, string, error) { + name := filepath.Join(b.path, key) + file, err := os.Open(name) + defer file.Close() + if err != nil { + return nil, "", err + } + info, err := file.Stat() + if err != nil { + return nil, "", err + } + modInfo := fmt.Sprintf("%d %d", info.ModTime().UnixNano(), info.Size()) + hash := md5.Sum([]byte(modInfo)) + newEtag := fmt.Sprintf(`"%s"`, hex.EncodeToString(hash[:])) + if len(etag) > 0 && etag != newEtag { + return nil, "", &RefreshRequiredError{} + } + result := make([]byte, length) + read, err := file.ReadAt(result, offset) + if err != nil { + return nil, "", err + } + if read != int(length) { + return nil, "", fmt.Errorf("Expected to read %d bytes but only read %d", length, read) + } + return io.NopCloser(bytes.NewReader(result)), newEtag, nil +} + +func (b FileBucket) Close() error { + return nil +} + // HTTPClient is an interface that lets you swap out the default client with a mock one in tests type HTTPClient interface { Do(req *http.Request) (*http.Response, error) @@ -198,6 +240,15 @@ func OpenBucket(ctx context.Context, bucketURL string, bucketPrefix string) (Buc bucket := HTTPBucket{bucketURL, http.DefaultClient} return bucket, nil } + if strings.HasPrefix(bucketURL, "file") { + fileprotocol := "file://" + if string(os.PathSeparator) != "/" { + fileprotocol += "/" + } + path := strings.Replace(bucketURL, fileprotocol, "", 1) + bucket := FileBucket{filepath.FromSlash(path)} + return bucket, nil + } bucket, err := blob.OpenBucket(ctx, bucketURL) if err != nil { return nil, err diff --git a/pmtiles/bucket_test.go b/pmtiles/bucket_test.go index f65681b..257b2f0 100644 --- a/pmtiles/bucket_test.go +++ b/pmtiles/bucket_test.go @@ -2,13 +2,16 @@ package pmtiles import ( "context" + "fmt" "io" "net/http" "os" + "path/filepath" "strings" "testing" "github.com/stretchr/testify/assert" + _ "gocloud.dev/blob/fileblob" ) func TestNormalizeLocalFile(t *testing.T) { @@ -114,3 +117,69 @@ func TestHttpBucketRequestRequestEtagFailed(t *testing.T) { _, _, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1") assert.False(t, isRefreshRequredError(err)) } + +func TestFileBucketReplace(t *testing.T) { + tmp := t.TempDir() + bucketURL, _, err := NormalizeBucketKey("", tmp, "") + assert.Nil(t, err) + fmt.Println(bucketURL) + bucket, err := OpenBucket(context.Background(), bucketURL, "") + assert.Nil(t, err) + assert.NotNil(t, bucket) + assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive.pmtiles"), []byte{1, 2, 3}, 0666)) + + // first read from file + reader, etag1, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "") + assert.Nil(t, err) + data, err := io.ReadAll(reader) + assert.Nil(t, err) + assert.Equal(t, []byte{2}, data) + + // change file, verify etag changes + assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive.pmtiles"), []byte{4, 5, 6, 7}, 0666)) + reader, etag2, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "") + assert.Nil(t, err) + data, err = io.ReadAll(reader) + assert.Nil(t, err) + assert.NotEqual(t, etag1, etag2) + assert.Equal(t, []byte{5}, data) + + // and requesting with old etag fails with refresh required error + _, _, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1) + assert.True(t, isRefreshRequredError(err)) +} + +func TestFileBucketRename(t *testing.T) { + tmp := t.TempDir() + assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive.pmtiles"), []byte{1, 2, 3}, 0666)) + assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive2.pmtiles"), []byte{4, 5, 6, 7}, 0666)) + + bucketURL, _, err := NormalizeBucketKey("", tmp, "") + assert.Nil(t, err) + fmt.Println(bucketURL) + bucket, err := OpenBucket(context.Background(), bucketURL, "") + assert.Nil(t, err) + assert.NotNil(t, bucket) + assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive.pmtiles"), []byte{1, 2, 3}, 0666)) + + // first read from file + reader, etag1, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "") + assert.Nil(t, err) + data, err := io.ReadAll(reader) + assert.Nil(t, err) + assert.Equal(t, []byte{2}, data) + + // change file, verify etag changes + os.Rename(filepath.Join(tmp, "archive.pmtiles"), filepath.Join(tmp, "archive3.pmtiles")) + os.Rename(filepath.Join(tmp, "archive2.pmtiles"), filepath.Join(tmp, "archive.pmtiles")) + reader, etag2, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, "") + assert.Nil(t, err) + data, err = io.ReadAll(reader) + assert.Nil(t, err) + assert.NotEqual(t, etag1, etag2) + assert.Equal(t, []byte{5}, data) + + // and requesting with old etag fails with refresh required error + _, _, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1) + assert.True(t, isRefreshRequredError(err)) +}