From eabe6bb964658e7ce152b1aaf1af5d5999372b2e Mon Sep 17 00:00:00 2001 From: Harry Sarson Date: Thu, 11 Jul 2024 16:04:08 +0100 Subject: [PATCH] AC: use a RootDirectoryDigest for directory assets Which means we avoid needing to calculate TreeDigest`s whilst still creating Actions/ActionResults that encode that an asset is a directory. Note: the bb-remote-asset server does not validate that the RootDirectoryDigest actually references a Directory proto because bb-remote-asset treats the digest as an opaque digest. See https://github.com/buildbarn/bb-remote-asset/pull/34#issuecomment-2220546603 --- pkg/fetch/caching_fetcher.go | 4 +- pkg/fetch/caching_fetcher_test.go | 9 +- pkg/proto/asset/asset.pb.go | 106 ++++++++-- pkg/proto/asset/asset.proto | 11 ++ pkg/push/push_server.go | 8 +- pkg/push/push_server_test.go | 2 + pkg/storage/action_cache_asset_store.go | 70 +++++-- pkg/storage/action_cache_asset_store_test.go | 195 +++++++++++++++++-- pkg/storage/asset.go | 13 +- pkg/storage/asset_reference.go | 5 +- pkg/storage/authorizing_asset_store_test.go | 4 +- pkg/storage/blob_access_asset_store_test.go | 2 +- 12 files changed, 368 insertions(+), 61 deletions(-) diff --git a/pkg/fetch/caching_fetcher.go b/pkg/fetch/caching_fetcher.go index 5c94cad..e89e336 100644 --- a/pkg/fetch/caching_fetcher.go +++ b/pkg/fetch/caching_fetcher.go @@ -89,7 +89,7 @@ func (cf *cachingFetcher) FetchBlob(ctx context.Context, req *remoteasset.FetchB // Cache fetched blob with single URI assetRef := storage.NewAssetReference([]string{response.Uri}, response.Qualifiers) - assetData := storage.NewAsset(response.BlobDigest, getDefaultTimestamp()) + assetData := storage.NewBlobAsset(response.BlobDigest, getDefaultTimestamp()) err = cf.assetStore.Put(ctx, assetRef, assetData, instanceName) if err != nil { return response, err @@ -159,7 +159,7 @@ func (cf *cachingFetcher) FetchDirectory(ctx context.Context, req *remoteasset.F // Cache fetched blob with single URI assetRef := storage.NewAssetReference([]string{response.Uri}, response.Qualifiers) - assetData := storage.NewAsset(response.RootDirectoryDigest, getDefaultTimestamp()) + assetData := storage.NewDirectoryAsset(response.RootDirectoryDigest, getDefaultTimestamp()) err = cf.assetStore.Put(ctx, assetRef, assetData, instanceName) if err != nil { return response, err diff --git a/pkg/fetch/caching_fetcher_test.go b/pkg/fetch/caching_fetcher_test.go index 3dc54ae..1f9db8a 100644 --- a/pkg/fetch/caching_fetcher_test.go +++ b/pkg/fetch/caching_fetcher_test.go @@ -38,6 +38,8 @@ func TestFetchBlobCaching(t *testing.T) { refDigest, err := storage.AssetReferenceToDigest(storage.NewAssetReference([]string{uri}, []*remoteasset.Qualifier{}), instanceName) require.NoError(t, err) + t.Logf("Ref digest was %v", refDigest) + backend := mock.NewMockBlobAccess(ctrl) assetStore := storage.NewBlobAccessAssetStore(backend, 16*1024*1024) mockFetcher := mock.NewMockFetcher(ctrl) @@ -56,6 +58,7 @@ func TestFetchBlobCaching(t *testing.T) { require.NoError(t, err) a := m.(*asset.Asset) require.True(t, proto.Equal(a.Digest, blobDigest)) + require.Equal(t, asset.Asset_BLOB, a.Type) return nil }).After(fetchBlobCall) response, err := cachingFetcher.FetchBlob(ctx, request) @@ -71,7 +74,7 @@ func TestFetchBlobCaching(t *testing.T) { }) t.Run("Cached", func(t *testing.T) { - backend.EXPECT().Get(ctx, refDigest).Return(buffer.NewProtoBufferFromProto(storage.NewAsset(blobDigest, nil), buffer.UserProvided)) + backend.EXPECT().Get(ctx, refDigest).Return(buffer.NewProtoBufferFromProto(storage.NewBlobAsset(blobDigest, nil), buffer.UserProvided)) response, err := cachingFetcher.FetchBlob(ctx, request) require.Nil(t, err) require.Equal(t, response.Status.Code, int32(codes.OK)) @@ -111,6 +114,7 @@ func TestFetchDirectoryCaching(t *testing.T) { require.NoError(t, err) a := m.(*asset.Asset) require.True(t, proto.Equal(a.Digest, dirDigest)) + require.Equal(t, asset.Asset_DIRECTORY, a.Type) return nil }).After(fetchDirectoryCall) response, err := cachingFetcher.FetchDirectory(ctx, request) @@ -126,7 +130,7 @@ func TestFetchDirectoryCaching(t *testing.T) { }) t.Run("Cached", func(t *testing.T) { - backend.EXPECT().Get(ctx, refDigest).Return(buffer.NewProtoBufferFromProto(storage.NewAsset(dirDigest, nil), buffer.UserProvided)) + backend.EXPECT().Get(ctx, refDigest).Return(buffer.NewProtoBufferFromProto(storage.NewBlobAsset(dirDigest, nil), buffer.UserProvided)) response, err := cachingFetcher.FetchDirectory(ctx, request) require.Nil(t, err) require.Equal(t, response.Status.Code, int32(codes.OK)) @@ -191,6 +195,7 @@ func TestCachingFetcherOldestContentAccepted(t *testing.T) { SizeBytes: 234, }, LastUpdated: ts, + Type: asset.Asset_BLOB, }, buffer.UserProvided) backend.EXPECT().Get(ctx, refDigest).Return(buf) assetStore := storage.NewBlobAccessAssetStore(backend, 16*1024*1024) diff --git a/pkg/proto/asset/asset.pb.go b/pkg/proto/asset/asset.pb.go index 5b0fc73..6d7c50a 100644 --- a/pkg/proto/asset/asset.pb.go +++ b/pkg/proto/asset/asset.pb.go @@ -23,6 +23,52 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type Asset_AssetType int32 + +const ( + Asset_BLOB Asset_AssetType = 0 + Asset_DIRECTORY Asset_AssetType = 1 +) + +// Enum value maps for Asset_AssetType. +var ( + Asset_AssetType_name = map[int32]string{ + 0: "BLOB", + 1: "DIRECTORY", + } + Asset_AssetType_value = map[string]int32{ + "BLOB": 0, + "DIRECTORY": 1, + } +) + +func (x Asset_AssetType) Enum() *Asset_AssetType { + p := new(Asset_AssetType) + *p = x + return p +} + +func (x Asset_AssetType) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (Asset_AssetType) Descriptor() protoreflect.EnumDescriptor { + return file_pkg_proto_asset_asset_proto_enumTypes[0].Descriptor() +} + +func (Asset_AssetType) Type() protoreflect.EnumType { + return &file_pkg_proto_asset_asset_proto_enumTypes[0] +} + +func (x Asset_AssetType) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use Asset_AssetType.Descriptor instead. +func (Asset_AssetType) EnumDescriptor() ([]byte, []int) { + return file_pkg_proto_asset_asset_proto_rawDescGZIP(), []int{1, 0} +} + type AssetReference struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -86,6 +132,7 @@ type Asset struct { Digest *v2.Digest `protobuf:"bytes,1,opt,name=digest,proto3" json:"digest,omitempty"` ExpireAt *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=expire_at,json=expireAt,proto3" json:"expire_at,omitempty"` LastUpdated *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=last_updated,json=lastUpdated,proto3" json:"last_updated,omitempty"` + Type Asset_AssetType `protobuf:"varint,4,opt,name=type,proto3,enum=buildbarn.asset.Asset_AssetType" json:"type,omitempty"` } func (x *Asset) Reset() { @@ -141,6 +188,13 @@ func (x *Asset) GetLastUpdated() *timestamppb.Timestamp { return nil } +func (x *Asset) GetType() Asset_AssetType { + if x != nil { + return x.Type + } + return Asset_BLOB +} + var File_pkg_proto_asset_asset_proto protoreflect.FileDescriptor var file_pkg_proto_asset_asset_proto_rawDesc = []byte{ @@ -162,7 +216,7 @@ var file_pkg_proto_asset_asset_proto_rawDesc = []byte{ 0x0b, 0x32, 0x26, 0x2e, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x2e, 0x62, 0x61, 0x7a, 0x65, 0x6c, 0x2e, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x2e, 0x61, 0x73, 0x73, 0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x51, 0x75, 0x61, 0x6c, 0x69, 0x66, 0x69, 0x65, 0x72, 0x52, 0x0a, 0x71, 0x75, 0x61, 0x6c, 0x69, - 0x66, 0x69, 0x65, 0x72, 0x73, 0x22, 0xc0, 0x01, 0x0a, 0x05, 0x41, 0x73, 0x73, 0x65, 0x74, 0x12, + 0x66, 0x69, 0x65, 0x72, 0x73, 0x22, 0x9c, 0x02, 0x0a, 0x05, 0x41, 0x73, 0x73, 0x65, 0x74, 0x12, 0x3f, 0x0a, 0x06, 0x64, 0x69, 0x67, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x2e, 0x62, 0x61, 0x7a, 0x65, 0x6c, 0x2e, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x2e, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x76, @@ -174,11 +228,17 @@ var file_pkg_proto_asset_asset_proto_rawDesc = []byte{ 0x74, 0x5f, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x6c, 0x61, 0x73, - 0x74, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x42, 0x36, 0x5a, 0x34, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x62, 0x61, 0x72, 0x6e, - 0x2f, 0x62, 0x62, 0x2d, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x2d, 0x61, 0x73, 0x73, 0x65, 0x74, - 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x61, 0x73, 0x73, 0x65, 0x74, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x12, 0x34, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x20, 0x2e, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x62, 0x61, + 0x72, 0x6e, 0x2e, 0x61, 0x73, 0x73, 0x65, 0x74, 0x2e, 0x41, 0x73, 0x73, 0x65, 0x74, 0x2e, 0x41, + 0x73, 0x73, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x24, + 0x0a, 0x09, 0x41, 0x73, 0x73, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x08, 0x0a, 0x04, 0x42, + 0x4c, 0x4f, 0x42, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x44, 0x49, 0x52, 0x45, 0x43, 0x54, 0x4f, + 0x52, 0x59, 0x10, 0x01, 0x42, 0x36, 0x5a, 0x34, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x62, 0x61, 0x72, 0x6e, 0x2f, 0x62, 0x62, 0x2d, + 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x2d, 0x61, 0x73, 0x73, 0x65, 0x74, 0x2f, 0x70, 0x6b, 0x67, + 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x61, 0x73, 0x73, 0x65, 0x74, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -193,24 +253,27 @@ func file_pkg_proto_asset_asset_proto_rawDescGZIP() []byte { return file_pkg_proto_asset_asset_proto_rawDescData } +var file_pkg_proto_asset_asset_proto_enumTypes = make([]protoimpl.EnumInfo, 1) var file_pkg_proto_asset_asset_proto_msgTypes = make([]protoimpl.MessageInfo, 2) var file_pkg_proto_asset_asset_proto_goTypes = []interface{}{ - (*AssetReference)(nil), // 0: buildbarn.asset.AssetReference - (*Asset)(nil), // 1: buildbarn.asset.Asset - (*v1.Qualifier)(nil), // 2: build.bazel.remote.asset.v1.Qualifier - (*v2.Digest)(nil), // 3: build.bazel.remote.execution.v2.Digest - (*timestamppb.Timestamp)(nil), // 4: google.protobuf.Timestamp + (Asset_AssetType)(0), // 0: buildbarn.asset.Asset.AssetType + (*AssetReference)(nil), // 1: buildbarn.asset.AssetReference + (*Asset)(nil), // 2: buildbarn.asset.Asset + (*v1.Qualifier)(nil), // 3: build.bazel.remote.asset.v1.Qualifier + (*v2.Digest)(nil), // 4: build.bazel.remote.execution.v2.Digest + (*timestamppb.Timestamp)(nil), // 5: google.protobuf.Timestamp } var file_pkg_proto_asset_asset_proto_depIdxs = []int32{ - 2, // 0: buildbarn.asset.AssetReference.qualifiers:type_name -> build.bazel.remote.asset.v1.Qualifier - 3, // 1: buildbarn.asset.Asset.digest:type_name -> build.bazel.remote.execution.v2.Digest - 4, // 2: buildbarn.asset.Asset.expire_at:type_name -> google.protobuf.Timestamp - 4, // 3: buildbarn.asset.Asset.last_updated:type_name -> google.protobuf.Timestamp - 4, // [4:4] is the sub-list for method output_type - 4, // [4:4] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name + 3, // 0: buildbarn.asset.AssetReference.qualifiers:type_name -> build.bazel.remote.asset.v1.Qualifier + 4, // 1: buildbarn.asset.Asset.digest:type_name -> build.bazel.remote.execution.v2.Digest + 5, // 2: buildbarn.asset.Asset.expire_at:type_name -> google.protobuf.Timestamp + 5, // 3: buildbarn.asset.Asset.last_updated:type_name -> google.protobuf.Timestamp + 0, // 4: buildbarn.asset.Asset.type:type_name -> buildbarn.asset.Asset.AssetType + 5, // [5:5] is the sub-list for method output_type + 5, // [5:5] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name } func init() { file_pkg_proto_asset_asset_proto_init() } @@ -249,13 +312,14 @@ func file_pkg_proto_asset_asset_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_pkg_proto_asset_asset_proto_rawDesc, - NumEnums: 0, + NumEnums: 1, NumMessages: 2, NumExtensions: 0, NumServices: 0, }, GoTypes: file_pkg_proto_asset_asset_proto_goTypes, DependencyIndexes: file_pkg_proto_asset_asset_proto_depIdxs, + EnumInfos: file_pkg_proto_asset_asset_proto_enumTypes, MessageInfos: file_pkg_proto_asset_asset_proto_msgTypes, }.Build() File_pkg_proto_asset_asset_proto = out.File diff --git a/pkg/proto/asset/asset.proto b/pkg/proto/asset/asset.proto index f0b121d..527b40a 100644 --- a/pkg/proto/asset/asset.proto +++ b/pkg/proto/asset/asset.proto @@ -27,4 +27,15 @@ message Asset { // Time at which this Asset was last Push'd or Fetch'd from a remote into the // store google.protobuf.Timestamp last_updated = 3; + + enum AssetType { + // Blob asset, e.g. from PushBlob + BLOB = 0; + + // Directory asset, e.g. from PushDirectory + DIRECTORY = 1; + } + + // The type of the asset. + AssetType type = 4; } diff --git a/pkg/push/push_server.go b/pkg/push/push_server.go index 0ae7d2a..0fccb31 100644 --- a/pkg/push/push_server.go +++ b/pkg/push/push_server.go @@ -40,7 +40,7 @@ func (s *assetPushServer) PushBlob(ctx context.Context, req *remoteasset.PushBlo } assetRef := storage.NewAssetReference(req.Uris, req.Qualifiers) - assetData := storage.NewAsset(req.BlobDigest, req.ExpireAt) + assetData := storage.NewBlobAsset(req.BlobDigest, req.ExpireAt) err = s.assetStore.Put(ctx, assetRef, assetData, instanceName) if err != nil { return nil, err @@ -49,7 +49,7 @@ func (s *assetPushServer) PushBlob(ctx context.Context, req *remoteasset.PushBlo if len(req.Uris) > 1 { for _, uri := range req.Uris { assetRef := storage.NewAssetReference([]string{uri}, req.Qualifiers) - assetData := storage.NewAsset(req.BlobDigest, req.ExpireAt) + assetData := storage.NewBlobAsset(req.BlobDigest, req.ExpireAt) err = s.assetStore.Put(ctx, assetRef, assetData, instanceName) if err != nil { return nil, err @@ -74,7 +74,7 @@ func (s *assetPushServer) PushDirectory(ctx context.Context, req *remoteasset.Pu } assetRef := storage.NewAssetReference(req.Uris, req.Qualifiers) - assetData := storage.NewAsset(req.RootDirectoryDigest, req.ExpireAt) + assetData := storage.NewDirectoryAsset(req.RootDirectoryDigest, req.ExpireAt) err = s.assetStore.Put(ctx, assetRef, assetData, instanceName) if err != nil { return nil, err @@ -83,7 +83,7 @@ func (s *assetPushServer) PushDirectory(ctx context.Context, req *remoteasset.Pu if len(req.Uris) > 1 { for _, uri := range req.Uris { assetRef := storage.NewAssetReference([]string{uri}, req.Qualifiers) - assetData := storage.NewAsset(req.RootDirectoryDigest, req.ExpireAt) + assetData := storage.NewDirectoryAsset(req.RootDirectoryDigest, req.ExpireAt) err = s.assetStore.Put(ctx, assetRef, assetData, instanceName) if err != nil { return nil, err diff --git a/pkg/push/push_server_test.go b/pkg/push/push_server_test.go index d218cd6..884ba19 100644 --- a/pkg/push/push_server_test.go +++ b/pkg/push/push_server_test.go @@ -52,6 +52,7 @@ func TestPushServerPushBlobSuccess(t *testing.T) { require.NoError(t, err) a := m.(*asset.Asset) require.True(t, proto.Equal(a.Digest, blobDigest)) + require.Equal(t, asset.Asset_BLOB, a.Type) return nil }) assetStore := storage.NewBlobAccessAssetStore(backend, 16*1024*1024) @@ -91,6 +92,7 @@ func TestPushServerPushDirectorySuccess(t *testing.T) { require.NoError(t, err) a := m.(*asset.Asset) require.True(t, proto.Equal(a.Digest, rootDirectoryDigest)) + require.Equal(t, asset.Asset_DIRECTORY, a.Type) return nil }) assetStore := storage.NewBlobAccessAssetStore(backend, 16*1024*1024) diff --git a/pkg/storage/action_cache_asset_store.go b/pkg/storage/action_cache_asset_store.go index 900c952..e296e71 100644 --- a/pkg/storage/action_cache_asset_store.go +++ b/pkg/storage/action_cache_asset_store.go @@ -2,6 +2,7 @@ package storage import ( "context" + "fmt" "time" remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" @@ -32,17 +33,35 @@ func NewActionCacheAssetStore(actionCache, contentAddressableStorage blobstore.B func (rs *actionCacheAssetStore) actionResultToAsset(ctx context.Context, a *remoteexecution.ActionResult, instance digest.InstanceName) (*asset.Asset, error) { digest := &remoteexecution.Digest{} - // Required output directory is not present, look for required - // output file - for _, file := range a.OutputFiles { - if file.Path == "out" { - digest = file.Digest + assetType := asset.Asset_DIRECTORY + + // Check if there is an output directory in the action result + for _, dir := range a.OutputDirectories { + if dir.Path == "out" { + digest = dir.RootDirectoryDigest + } + } + + if digest == nil || digest.Hash == "" { + assetType = asset.Asset_BLOB + // Required output directory is not present, look for required + // output file + for _, file := range a.OutputFiles { + if file.Path == "out" { + digest = file.Digest + } } } + + if digest == nil || digest.Hash == "" { + return nil, fmt.Errorf("could not find digest (either directory or blob) in ActionResult") + } + return &asset.Asset{ Digest: digest, ExpireAt: getDefaultTimestamp(), LastUpdated: a.ExecutionMetadata.QueuedTimestamp, + Type: assetType, }, nil } @@ -156,10 +175,22 @@ func (rs *actionCacheAssetStore) Put(ctx context.Context, ref *asset.AssetRefere } var action *remoteexecution.Action var command *remoteexecution.Command - // Create the action with the qualifier directory as the input root - action, command, err = assetReferenceToAction(ref, directoryDigest) - if err != nil { - return err + if commandGenerator, err := qualifier.QualifiersToCommand(ref.Qualifiers); err != nil || len(ref.Uris) > 1 { + // Create the action with the qualifier directory as the input root + action, command, err = assetReferenceToAction(ref, directoryDigest) + if err != nil { + return err + } + } else { + command = commandGenerator(ref.Uris[0]) + commandDigest, err := ProtoToDigest(command) + if err != nil { + return err + } + action = &remoteexecution.Action{ + CommandDigest: commandDigest, + InputRootDigest: EmptyDigest, + } } actionPb, err := proto.Marshal(action) if err != nil { @@ -196,14 +227,27 @@ func (rs *actionCacheAssetStore) Put(ctx context.Context, ref *asset.AssetRefere } actionResult := &remoteexecution.ActionResult{ - OutputFiles: []*remoteexecution.OutputFile{{ - Path: "out", - Digest: data.Digest, - }}, ExecutionMetadata: &remoteexecution.ExecutedActionMetadata{ QueuedTimestamp: data.LastUpdated, }, } + + if data.Type == asset.Asset_DIRECTORY { + // Use digest as a root directory digest + actionResult.OutputDirectories = []*remoteexecution.OutputDirectory{{ + Path: "out", + RootDirectoryDigest: data.Digest, + }} + } else if data.Type == asset.Asset_BLOB { + // Use the digest as an output file digest + actionResult.OutputFiles = []*remoteexecution.OutputFile{{ + Path: "out", + Digest: data.Digest, + }} + } else { + return fmt.Errorf("unknown asset type %v", data.Type) + } + return rs.actionCache.Put(ctx, bbActionDigest, buffer.NewProtoBufferFromProto(actionResult, buffer.UserProvided)) } diff --git a/pkg/storage/action_cache_asset_store_test.go b/pkg/storage/action_cache_asset_store_test.go index 35c6993..0bbd92f 100644 --- a/pkg/storage/action_cache_asset_store_test.go +++ b/pkg/storage/action_cache_asset_store_test.go @@ -2,12 +2,14 @@ package storage_test import ( "context" + "fmt" "testing" "time" remoteasset "github.com/bazelbuild/remote-apis/build/bazel/remote/asset/v1" remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" "github.com/buildbarn/bb-remote-asset/internal/mock" + "github.com/buildbarn/bb-remote-asset/pkg/proto/asset" "github.com/buildbarn/bb-remote-asset/pkg/storage" "github.com/buildbarn/bb-storage/pkg/blobstore/buffer" "github.com/buildbarn/bb-storage/pkg/digest" @@ -32,7 +34,7 @@ func TestActionCacheAssetStorePutBlob(t *testing.T) { uri := "https://example.com/example.txt" assetRef := storage.NewAssetReference([]string{uri}, []*remoteasset.Qualifier{{Name: "test", Value: "test"}}) - assetData := storage.NewAsset(blobDigest, timestamppb.Now()) + assetData := storage.NewBlobAsset(blobDigest, timestamppb.Now()) refDigest := digest.MustNewDigest( "", remoteexecution.DigestFunction_SHA256, @@ -48,14 +50,14 @@ func TestActionCacheAssetStorePutBlob(t *testing.T) { actionDigest := digest.MustNewDigest( "", remoteexecution.DigestFunction_SHA256, - "ae2ece643d2907102b1949f00721514cdda44ce7cb2c03ccd2af4dac45792d09", + "b1b0b54caccd4235e968061f380649a9720ee67909fd1027200230eba93a2427", 140, ) commandDigest := digest.MustNewDigest( "", remoteexecution.DigestFunction_SHA256, - "e6842def39984b212641b9796c162b9e3085da84257bae614418f2255b0addc5", - 38, + "7b9d720c2fbc4e4c0fc9780208eae45f4d2c4dc23350c53dc1050259827ca459", + 40, ) ac := mock.NewMockBlobAccess(ctrl) @@ -96,8 +98,11 @@ func TestActionCacheAssetStorePutDirectory(t *testing.T) { uri := "https://example.com/example.txt" assetRef := storage.NewAssetReference([]string{uri}, []*remoteasset.Qualifier{{Name: "test", Value: "test"}}) - assetData := storage.NewAsset(rootDirectoryDigest, - timestamppb.Now()) + assetData := storage.NewAsset( + rootDirectoryDigest, + asset.Asset_DIRECTORY, + timestamppb.Now(), + ) refDigest := digest.MustNewDigest( "", remoteexecution.DigestFunction_SHA256, @@ -113,14 +118,14 @@ func TestActionCacheAssetStorePutDirectory(t *testing.T) { actionDigest := digest.MustNewDigest( "", remoteexecution.DigestFunction_SHA256, - "ae2ece643d2907102b1949f00721514cdda44ce7cb2c03ccd2af4dac45792d09", + "b1b0b54caccd4235e968061f380649a9720ee67909fd1027200230eba93a2427", 140, ) commandDigest := digest.MustNewDigest( "", remoteexecution.DigestFunction_SHA256, - "e6842def39984b212641b9796c162b9e3085da84257bae614418f2255b0addc5", - 38, + "7b9d720c2fbc4e4c0fc9780208eae45f4d2c4dc23350c53dc1050259827ca459", + 40, ) ac := mock.NewMockBlobAccess(ctrl) @@ -134,9 +139,9 @@ func TestActionCacheAssetStorePutDirectory(t *testing.T) { m, err := b.ToProto(&remoteexecution.ActionResult{}, 1000) require.NoError(t, err) a := m.(*remoteexecution.ActionResult) - for _, d := range a.OutputFiles { + for _, d := range a.OutputDirectories { if d.Path == "out" { - require.True(t, proto.Equal(d.Digest, rootDirectoryDigest)) + require.True(t, proto.Equal(d.RootDirectoryDigest, rootDirectoryDigest)) return nil } } @@ -148,6 +153,170 @@ func TestActionCacheAssetStorePutDirectory(t *testing.T) { require.NoError(t, err) } +func TestActionCacheAssetStorePutMalformedDirectory(t *testing.T) { + ctrl, ctx := gomock.WithContext(context.Background(), t) + + instanceName, err := digest.NewInstanceName("") + require.NoError(t, err) + + rootDirectoryDigest := &remoteexecution.Digest{ + Hash: "58de0f27ce0f781e5c109f18b0ee6905bdf64f2b1009e225ac67a27f656a0643", + SizeBytes: 111, + } + uri := "https://example.com/example.txt" + assetRef := storage.NewAssetReference([]string{uri}, + []*remoteasset.Qualifier{{Name: "test", Value: "test"}}) + assetData := storage.NewAsset( + rootDirectoryDigest, + asset.Asset_DIRECTORY, + timestamppb.Now(), + ) + refDigest := digest.MustNewDigest( + "", + remoteexecution.DigestFunction_SHA256, + "a2c2b32a289d4d9bf6e6309ed2691b6bcc04ee7923fcfd81bf1bfe0e7348139b", + 14, + ) + directoryDigest := digest.MustNewDigest( + "", + remoteexecution.DigestFunction_SHA256, + "c72e5e1e6ab54746d4fd3da7b443037187c81347a210d2ab8e5863638fbe1ac6", + 88, + ) + actionDigest := digest.MustNewDigest( + "", + remoteexecution.DigestFunction_SHA256, + "b1b0b54caccd4235e968061f380649a9720ee67909fd1027200230eba93a2427", + 140, + ) + commandDigest := digest.MustNewDigest( + "", + remoteexecution.DigestFunction_SHA256, + "7b9d720c2fbc4e4c0fc9780208eae45f4d2c4dc23350c53dc1050259827ca459", + 40, + ) + + ac := mock.NewMockBlobAccess(ctrl) + cas := mock.NewMockBlobAccess(ctrl) + cas.EXPECT().Put(ctx, refDigest, gomock.Any()).Return(nil) + cas.EXPECT().Put(ctx, directoryDigest, gomock.Any()).Return(nil) + cas.EXPECT().Put(ctx, actionDigest, gomock.Any()).Return(nil) + cas.EXPECT().Put(ctx, commandDigest, gomock.Any()).Return(nil) + ac.EXPECT().Put(ctx, actionDigest, gomock.Any()).DoAndReturn( + func(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { + m, err := b.ToProto(&remoteexecution.ActionResult{}, 1000) + require.NoError(t, err) + a := m.(*remoteexecution.ActionResult) + for _, d := range a.OutputDirectories { + if d.Path == "out" { + require.True(t, proto.Equal(d.RootDirectoryDigest, rootDirectoryDigest)) + return nil + } + } + return status.Error(codes.Internal, "Directory digest not found") + }) + assetStore := storage.NewActionCacheAssetStore(ac, cas, 16*1024*1024) + + err = assetStore.Put(ctx, assetRef, assetData, instanceName) + require.NoError(t, err) +} + +func roundTripTest(t *testing.T, assetRef *asset.AssetReference, assetData *asset.Asset) { + ctrl, ctx := gomock.WithContext(context.Background(), t) + + instanceName, err := digest.NewInstanceName("") + require.NoError(t, err) + + var actionDigest digest.Digest + var actionResult *remoteexecution.ActionResult + + { + ac := mock.NewMockBlobAccess(ctrl) + cas := mock.NewMockBlobAccess(ctrl) + + cas.EXPECT().Put(ctx, gomock.Any(), gomock.Any()).AnyTimes() + ac.EXPECT().Put(ctx, gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { + actionDigest = digest + m, err := b.ToProto(&remoteexecution.ActionResult{}, 1000) + require.NoError(t, err) + actionResult = m.(*remoteexecution.ActionResult) + return nil + }) + + assetStore := storage.NewActionCacheAssetStore(ac, cas, 16*1024*1024) + + err = assetStore.Put(ctx, assetRef, assetData, instanceName) + require.NoError(t, err) + } + { + require.NotNil(t, actionResult) + + ac := mock.NewMockBlobAccess(ctrl) + cas := mock.NewMockBlobAccess(ctrl) + + ac.EXPECT().Get(ctx, gomock.Any()).DoAndReturn( + func(ctx context.Context, digest digest.Digest) buffer.Buffer { + if digest == actionDigest { + return buffer.NewProtoBufferFromProto(actionResult, buffer.UserProvided) + } + return buffer.NewBufferFromError(fmt.Errorf("not in AC")) + }) + + assetStore := storage.NewActionCacheAssetStore(ac, cas, 16*1024*1024) + + asset, err := assetStore.Get(ctx, assetRef, instanceName) + require.NoError(t, err) + require.Equal(t, asset.Digest, assetData.Digest) + } +} + +func TestActionCacheAssetStoreRoundTrip(t *testing.T) { + expectedDigest := &remoteexecution.Digest{ + Hash: "58de0f27ce00781e5c109f18b0ee6905bdf64f2b1009e225ac67a27f656a0643", + SizeBytes: 115, + } + uri := "https://example.com/example.txt" + assetRef := storage.NewAssetReference([]string{uri}, + []*remoteasset.Qualifier{{Name: "test", Value: "test"}}) + + assetData := storage.NewBlobAsset(expectedDigest, timestamppb.Now()) + + roundTripTest(t, assetRef, assetData) +} + +func TestActionCacheAssetStoreRoundTripDirectory(t *testing.T) { + expectedDigest := &remoteexecution.Digest{ + Hash: "58de0f27ce00781e5c109f18b0ee6905bdf64f2b1009e225ac67a27f656a0643", + SizeBytes: 115, + } + uri := "https://example.com/example.txt" + assetRef := storage.NewAssetReference([]string{uri}, + []*remoteasset.Qualifier{{Name: "test", Value: "test"}}) + + assetData := storage.NewAsset( + expectedDigest, + asset.Asset_DIRECTORY, + timestamppb.Now(), + ) + + roundTripTest(t, assetRef, assetData) +} + +func TestActionCacheAssetStoreRoundTripWithSpecialQualifiers(t *testing.T) { + expectedDigest := &remoteexecution.Digest{ + Hash: "58de0f27ce00781e5c109f18b0ee6905bdf64f2b1009e225ac67a27f656a0643", + SizeBytes: 115, + } + uri := "https://example.com/example.txt" + assetRef := storage.NewAssetReference([]string{uri}, + []*remoteasset.Qualifier{{Name: "resource_type", Value: "application/x-git"}}) + + assetData := storage.NewBlobAsset(expectedDigest, timestamppb.Now()) + + roundTripTest(t, assetRef, assetData) +} + func TestActionCacheAssetStoreGetBlob(t *testing.T) { ctrl, ctx := gomock.WithContext(context.Background(), t) @@ -163,7 +332,7 @@ func TestActionCacheAssetStoreGetBlob(t *testing.T) { actionDigest := digest.MustNewDigest( "", remoteexecution.DigestFunction_SHA256, - "1543af664d856ac553f43cca0f61b3b948bafd6802308d67f42bbc09cd042218", + "b990c9040edaddd33b6e7506c23500cb4d9f699c780089aaa46c5d5f9479f74e", 140, ) @@ -200,7 +369,7 @@ func TestActionCacheAssetStoreGetDirectory(t *testing.T) { actionDigest := digest.MustNewDigest( "", remoteexecution.DigestFunction_SHA256, - "1543af664d856ac553f43cca0f61b3b948bafd6802308d67f42bbc09cd042218", + "b990c9040edaddd33b6e7506c23500cb4d9f699c780089aaa46c5d5f9479f74e", 140, ) diff --git a/pkg/storage/asset.go b/pkg/storage/asset.go index 624e86f..0713b01 100644 --- a/pkg/storage/asset.go +++ b/pkg/storage/asset.go @@ -7,10 +7,21 @@ import ( ) // NewAsset creates a new Asset from request data. -func NewAsset(digest *remoteexecution.Digest, expireAt *timestamppb.Timestamp) *asset.Asset { +func NewAsset(digest *remoteexecution.Digest, type_ asset.Asset_AssetType, expireAt *timestamppb.Timestamp) *asset.Asset { return &asset.Asset{ Digest: digest, ExpireAt: expireAt, LastUpdated: timestamppb.Now(), + Type: type_, } } + +// NewBlobAsset creates a new Asset (type Blob) from request data. +func NewBlobAsset(digest *remoteexecution.Digest, expireAt *timestamppb.Timestamp) *asset.Asset { + return NewAsset(digest, asset.Asset_BLOB, expireAt) +} + +// NewDirectoryAsset creates a new Asset (type Directory) from request data. +func NewDirectoryAsset(digest *remoteexecution.Digest, expireAt *timestamppb.Timestamp) *asset.Asset { + return NewAsset(digest, asset.Asset_DIRECTORY, expireAt) +} diff --git a/pkg/storage/asset_reference.go b/pkg/storage/asset_reference.go index 866875e..a294364 100644 --- a/pkg/storage/asset_reference.go +++ b/pkg/storage/asset_reference.go @@ -47,8 +47,9 @@ func AssetReferenceToDigest(ar *asset.AssetReference, instance digest.InstanceNa func assetReferenceToAction(ar *asset.AssetReference, directoryDigest *remoteexecution.Digest) (*remoteexecution.Action, *remoteexecution.Command, error) { command := &remoteexecution.Command{ - Arguments: ar.Uris, - OutputPaths: []string{"out"}, + Arguments: ar.Uris, + OutputPaths: []string{"out"}, + OutputDirectoryFormat: remoteexecution.Command_DIRECTORY_ONLY, } commandDigest, err := ProtoToDigest(command) if err != nil { diff --git a/pkg/storage/authorizing_asset_store_test.go b/pkg/storage/authorizing_asset_store_test.go index b41de9c..e5306f2 100644 --- a/pkg/storage/authorizing_asset_store_test.go +++ b/pkg/storage/authorizing_asset_store_test.go @@ -25,7 +25,7 @@ func TestAuthorizingBlobAccessGet(t *testing.T) { blobDigest := &remoteexecution.Digest{Hash: "b27cad931e1ef0a520887464127055ffd6db82c7b36bfea5cd832db65b8f816b", SizeBytes: 24} uri := "https://raapi.test/blob" assetRef := storage.NewAssetReference([]string{uri}, []*remoteasset.Qualifier{}) - assetData := storage.NewAsset(blobDigest, timestamppb.Now()) + assetData := storage.NewBlobAsset(blobDigest, timestamppb.Now()) baseStore := mock.NewMockAssetStore(ctrl) fetchAuthorizer := mock.NewMockAuthorizer(ctrl) @@ -58,7 +58,7 @@ func TestAuthorizingBlobAccessPut(t *testing.T) { blobDigest := &remoteexecution.Digest{Hash: "b27cad931e1ef0a520887464127055ffd6db82c7b36bfea5cd832db65b8f816b", SizeBytes: 24} uri := "https://raapi.test/blob" assetRef := storage.NewAssetReference([]string{uri}, []*remoteasset.Qualifier{}) - assetData := storage.NewAsset(blobDigest, timestamppb.Now()) + assetData := storage.NewBlobAsset(blobDigest, timestamppb.Now()) baseStore := mock.NewMockAssetStore(ctrl) fetchAuthorizer := mock.NewMockAuthorizer(ctrl) diff --git a/pkg/storage/blob_access_asset_store_test.go b/pkg/storage/blob_access_asset_store_test.go index f6dbdef..92bb72d 100644 --- a/pkg/storage/blob_access_asset_store_test.go +++ b/pkg/storage/blob_access_asset_store_test.go @@ -26,7 +26,7 @@ func TestBlobAccessAssetStorePut(t *testing.T) { blobDigest := &remoteexecution.Digest{Hash: "58de0f27ce0f781e5c109f18b0ee6905bdf64f2b1009e225ac67a27f656a0643", SizeBytes: 111} uri := "https://example.com/example.txt" assetRef := storage.NewAssetReference([]string{uri}, []*remoteasset.Qualifier{}) - assetData := storage.NewAsset(blobDigest, timestamppb.Now()) + assetData := storage.NewBlobAsset(blobDigest, timestamppb.Now()) refDigest, err := storage.AssetReferenceToDigest(assetRef, instanceName) require.NoError(t, err)