Skip to content

Commit

Permalink
give more info in caching_fetcher error messages
Browse files Browse the repository at this point in the history
  • Loading branch information
harrysarson committed Jul 29, 2024
1 parent 08705e5 commit 5f8b6c1
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 40 deletions.
1 change: 1 addition & 0 deletions pkg/fetch/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
importpath = "github.com/buildbarn/bb-remote-asset/pkg/fetch",
visibility = ["//visibility:public"],
deps = [
"//pkg/proto/asset",
"//pkg/qualifier",
"//pkg/storage",
"@com_github_bazelbuild_remote_apis//build/bazel/remote/asset/v1:asset",
Expand Down
96 changes: 58 additions & 38 deletions pkg/fetch/caching_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package fetch

import (
"context"
"errors"
"fmt"
"time"

"github.com/buildbarn/bb-remote-asset/pkg/proto/asset"
"github.com/buildbarn/bb-remote-asset/pkg/qualifier"
"github.com/buildbarn/bb-remote-asset/pkg/storage"
bb_digest "github.com/buildbarn/bb-storage/pkg/digest"
Expand Down Expand Up @@ -44,30 +47,16 @@ func (cf *cachingFetcher) FetchBlob(ctx context.Context, req *remoteasset.FetchB
oldestContentAccepted = req.OldestContentAccepted.AsTime()
}

allCachingErrors := []error{}

// Check assetStore
for _, uri := range req.Uris {
assetRef := storage.NewAssetReference([]string{uri}, req.Qualifiers)
assetData, err := cf.assetStore.Get(ctx, assetRef, instanceName)
assetData, err := getAndCheckAsset(ctx, cf.assetStore, uri, req.Qualifiers, instanceName, oldestContentAccepted)
if err != nil {
allCachingErrors = append(allCachingErrors, err)
continue
}

// Check whether the asset has expired, making sure ExpireAt was set
if assetData.ExpireAt != nil {
expireTime := assetData.ExpireAt.AsTime()
if expireTime.Before(time.Now()) && !expireTime.Equal(time.Unix(0, 0)) {
continue
}
}

// Check that content is newer than the oldest accepted by the request
if oldestContentAccepted != time.Unix(0, 0) {
updateTime := assetData.LastUpdated.AsTime()
if updateTime.Before(oldestContentAccepted) {
continue
}
}

// Successful retrieval from the asset reference cache
return &remoteasset.FetchBlobResponse{
Status: status.New(codes.OK, "Blob fetched successfully from asset cache").Proto(),
Expand All @@ -81,7 +70,13 @@ func (cf *cachingFetcher) FetchBlob(ctx context.Context, req *remoteasset.FetchB
// Fetch from wrapped fetcher
response, err := cf.fetcher.FetchBlob(ctx, req)
if err != nil {
return nil, err
errAsStatus := status.Convert(err)
return nil, status.Errorf(
errAsStatus.Code(),
"%s (retrieving cached blob failed with: %v)",
errAsStatus.Message(),
errors.Join(allCachingErrors...),
)
}
if response.Status.Code != 0 {
return response, nil
Expand All @@ -106,6 +101,39 @@ func (cf *cachingFetcher) FetchBlob(ctx context.Context, req *remoteasset.FetchB
return response, nil
}

func getAndCheckAsset(
ctx context.Context,
assetStore storage.AssetStore,
uri string,
qualifiers []*remoteasset.Qualifier,
instanceName bb_digest.InstanceName,
oldestContentAccepted time.Time,
) (*asset.Asset, error) {
assetRef := storage.NewAssetReference([]string{uri}, qualifiers)
assetData, err := assetStore.Get(ctx, assetRef, instanceName)
if err != nil {
return nil, err
}

// Check whether the asset has expired, making sure ExpireAt was set
if assetData.ExpireAt != nil {
expireTime := assetData.ExpireAt.AsTime()
if expireTime.Before(time.Now()) && !expireTime.Equal(time.Unix(0, 0)) {
return nil, fmt.Errorf("Asset expired at %v", expireTime)
}
}

// Check that content is newer than the oldest accepted by the request
if oldestContentAccepted != time.Unix(0, 0) {
updateTime := assetData.LastUpdated.AsTime()
if updateTime.Before(oldestContentAccepted) {
return nil, fmt.Errorf("Asset older than %v", oldestContentAccepted)
}
}

return assetData, nil
}

func (cf *cachingFetcher) FetchDirectory(ctx context.Context, req *remoteasset.FetchDirectoryRequest) (*remoteasset.FetchDirectoryResponse, error) {
instanceName, err := bb_digest.NewInstanceName(req.InstanceName)
if err != nil {
Expand All @@ -117,30 +145,16 @@ func (cf *cachingFetcher) FetchDirectory(ctx context.Context, req *remoteasset.F
oldestContentAccepted = req.OldestContentAccepted.AsTime()
}

allCachingErrors := []error{}

// Check refStore
for _, uri := range req.Uris {
assetRef := storage.NewAssetReference([]string{uri}, req.Qualifiers)
assetData, err := cf.assetStore.Get(ctx, assetRef, instanceName)
assetData, err := getAndCheckAsset(ctx, cf.assetStore, uri, req.Qualifiers, instanceName, oldestContentAccepted)
if err != nil {
allCachingErrors = append(allCachingErrors, err)
continue
}

// Check whether the asset has expired, making sure ExpireAt was set
if assetData.ExpireAt != nil {
expireTime := assetData.ExpireAt.AsTime()
if expireTime.Before(time.Now()) && !expireTime.Equal(time.Unix(0, 0)) {
continue
}
}

// Check that content is newer than the oldest accepted by the request
if oldestContentAccepted != time.Unix(0, 0) {
updateTime := assetData.LastUpdated.AsTime()
if updateTime.Before(oldestContentAccepted) {
continue
}
}

// Successful retrieval from the asset reference cache
return &remoteasset.FetchDirectoryResponse{
Status: status.New(codes.OK, "Directory fetched successfully from asset cache").Proto(),
Expand All @@ -154,7 +168,13 @@ func (cf *cachingFetcher) FetchDirectory(ctx context.Context, req *remoteasset.F
// Fetch from wrapped fetcher
response, err := cf.fetcher.FetchDirectory(ctx, req)
if err != nil {
return nil, err
errAsStatus := status.Convert(err)
return nil, status.Errorf(
errAsStatus.Code(),
"%s (retrieving cached directory failed with: %v)",
errAsStatus.Message(),
errors.Join(allCachingErrors...),
)
}

// Cache fetched blob with single URI
Expand Down
11 changes: 9 additions & 2 deletions pkg/fetch/caching_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,11 @@ func TestCachingFetcherExpiry(t *testing.T) {
cacheFetcher := fetch.NewCachingFetcher(baseFetcher, assetStore)

_, err = cacheFetcher.FetchBlob(ctx, request)
require.Equal(t, status.ErrorProto(&protostatus.Status{Code: 5, Message: "Not found"}), err)

errAsStatus := status.Convert(err)
require.Contains(t, errAsStatus.Message(), "Not found")
require.Contains(t, errAsStatus.Message(), "Asset expired at")
require.Equal(t, errAsStatus.Code(), codes.NotFound)
}

func TestCachingFetcherOldestContentAccepted(t *testing.T) {
Expand Down Expand Up @@ -206,5 +210,8 @@ func TestCachingFetcherOldestContentAccepted(t *testing.T) {
cacheFetcher := fetch.NewCachingFetcher(baseFetcher, assetStore)

_, err = cacheFetcher.FetchBlob(ctx, request)
require.Equal(t, status.ErrorProto(&protostatus.Status{Code: 5, Message: "Not found"}), err)
errAsStatus := status.Convert(err)
require.Contains(t, errAsStatus.Message(), "Not found")
require.Contains(t, errAsStatus.Message(), "Asset older than")
require.Equal(t, errAsStatus.Code(), codes.NotFound)
}

0 comments on commit 5f8b6c1

Please sign in to comment.