diff --git a/pkg/qualifier/qualifier_translator.go b/pkg/qualifier/qualifier_translator.go index a093ad2..166c3da 100644 --- a/pkg/qualifier/qualifier_translator.go +++ b/pkg/qualifier/qualifier_translator.go @@ -55,7 +55,7 @@ func gitCommand(qualifiers map[string]string) func(string) *remoteexecution.Comm return &remoteexecution.Command{ Arguments: []string{"sh", "-c", script}, OutputPaths: []string{"out"}, - OutputDirectoryFormat: remoteexecution.Command_DIRECTORY_ONLY, + OutputDirectoryFormat: remoteexecution.Command_TREE_AND_DIRECTORY, } } } @@ -81,7 +81,7 @@ func octetStreamCommand(qualifiers map[string]string) func(string) *remoteexecut return &remoteexecution.Command{ Arguments: []string{"sh", "-c", script}, OutputPaths: []string{"out"}, - OutputDirectoryFormat: remoteexecution.Command_DIRECTORY_ONLY, + OutputDirectoryFormat: remoteexecution.Command_TREE_AND_DIRECTORY, } } } diff --git a/pkg/storage/BUILD.bazel b/pkg/storage/BUILD.bazel index 915588d..ea21315 100644 --- a/pkg/storage/BUILD.bazel +++ b/pkg/storage/BUILD.bazel @@ -39,6 +39,7 @@ go_test( ":storage", "//internal/mock", "//pkg/proto/asset", + "//pkg/storage/blobstore", "@com_github_bazelbuild_remote_apis//build/bazel/remote/asset/v1:asset", "@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:execution", "@com_github_buildbarn_bb_storage//pkg/blobstore/buffer", diff --git a/pkg/storage/action_cache_asset_store.go b/pkg/storage/action_cache_asset_store.go index e296e71..9d3bcc5 100644 --- a/pkg/storage/action_cache_asset_store.go +++ b/pkg/storage/action_cache_asset_store.go @@ -31,6 +31,22 @@ func NewActionCacheAssetStore(actionCache, contentAddressableStorage blobstore.B } } +func (rs *actionCacheAssetStore) assetToDirectory(ctx context.Context, asset *asset.Asset, instance digest.InstanceName) (*remoteexecution.Directory, error) { + digestFunction, err := instance.GetDigestFunction(remoteexecution.DigestFunction_UNKNOWN, len(asset.Digest.GetHash())) + if err != nil { + return nil, err + } + digest, err := digestFunction.NewDigestFromProto(asset.Digest) + if err != nil { + return nil, err + } + directory, err := rs.contentAddressableStorage.Get(ctx, digest).ToProto(&remoteexecution.Directory{}, rs.maximumMessageSizeBytes) + if err != nil { + return nil, err + } + return directory.(*remoteexecution.Directory), nil +} + func (rs *actionCacheAssetStore) actionResultToAsset(ctx context.Context, a *remoteexecution.ActionResult, instance digest.InstanceName) (*asset.Asset, error) { digest := &remoteexecution.Digest{} assetType := asset.Asset_DIRECTORY @@ -233,10 +249,43 @@ func (rs *actionCacheAssetStore) Put(ctx context.Context, ref *asset.AssetRefere } if data.Type == asset.Asset_DIRECTORY { + d, err := rs.assetToDirectory(ctx, data, instance) + + if err != nil { + // Users will hit this if they upload an digest referencing an + // arbitary Blob in `PushDirectory` or a digest that does not + // reference any blob at all. + return fmt.Errorf("digest in directory asset does not reference a Directory: %v", err) + } + + // If it is a directory, construct a tree from it as tree digest is + // required for action result + tree, err := rs.directoryToTree(ctx, d, instance) + if err != nil { + return err + } + treePb, err := proto.Marshal(tree) + if err != nil { + return err + } + treeDigest, err := ProtoToDigest(tree) + if err != nil { + return err + } + bbTreeDigest, err := digestFunction.NewDigestFromProto(treeDigest) + if err != nil { + return err + } + err = rs.contentAddressableStorage.Put(ctx, bbTreeDigest, buffer.NewCASBufferFromByteSlice(bbTreeDigest, treePb, buffer.UserProvided)) + if err != nil { + return err + } + // Use digest as a root directory digest actionResult.OutputDirectories = []*remoteexecution.OutputDirectory{{ Path: "out", RootDirectoryDigest: data.Digest, + TreeDigest: treeDigest, }} } else if data.Type == asset.Asset_BLOB { // Use the digest as an output file digest @@ -251,6 +300,46 @@ func (rs *actionCacheAssetStore) Put(ctx context.Context, ref *asset.AssetRefere return rs.actionCache.Put(ctx, bbActionDigest, buffer.NewProtoBufferFromProto(actionResult, buffer.UserProvided)) } +func (rs *actionCacheAssetStore) directoryToTree(ctx context.Context, directory *remoteexecution.Directory, instance digest.InstanceName) (*remoteexecution.Tree, error) { + children := []*remoteexecution.Directory{} + for _, node := range directory.Directories { + nodeChildren, err := rs.directoryNodeToDirectories(ctx, instance, node) + if err != nil { + return nil, err + } + children = append(children, nodeChildren...) + } + + return &remoteexecution.Tree{ + Root: directory, + Children: children, + }, nil +} + +func (rs *actionCacheAssetStore) directoryNodeToDirectories(ctx context.Context, instance digest.InstanceName, node *remoteexecution.DirectoryNode) ([]*remoteexecution.Directory, error) { + digestFunction, err := instance.GetDigestFunction(remoteexecution.DigestFunction_UNKNOWN, len(node.GetDigest().GetHash())) + if err != nil { + return nil, err + } + digest, err := digestFunction.NewDigestFromProto(node.Digest) + if err != nil { + return nil, err + } + directory, err := rs.contentAddressableStorage.Get(ctx, digest).ToProto(&remoteexecution.Directory{}, rs.maximumMessageSizeBytes) + if err != nil { + return nil, err + } + directories := []*remoteexecution.Directory{directory.(*remoteexecution.Directory)} + for _, node := range directory.(*remoteexecution.Directory).Directories { + children, err := rs.directoryNodeToDirectories(ctx, instance, node) + if err != nil { + return nil, err + } + directories = append(directories, children...) + } + return directories, nil +} + func getDefaultTimestamp() *timestamppb.Timestamp { return timestamppb.New(time.Unix(0, 0)) } diff --git a/pkg/storage/action_cache_asset_store_test.go b/pkg/storage/action_cache_asset_store_test.go index 0bbd92f..11b8249 100644 --- a/pkg/storage/action_cache_asset_store_test.go +++ b/pkg/storage/action_cache_asset_store_test.go @@ -22,6 +22,15 @@ import ( "google.golang.org/grpc/status" ) +func bbDigest(d *remoteexecution.Digest) digest.Digest { + return digest.MustNewDigest( + "", + remoteexecution.DigestFunction_SHA256, + d.Hash, + d.SizeBytes, + ) +} + func TestActionCacheAssetStorePutBlob(t *testing.T) { ctrl, ctx := gomock.WithContext(context.Background(), t) @@ -50,13 +59,13 @@ func TestActionCacheAssetStorePutBlob(t *testing.T) { actionDigest := digest.MustNewDigest( "", remoteexecution.DigestFunction_SHA256, - "b1b0b54caccd4235e968061f380649a9720ee67909fd1027200230eba93a2427", + "80fa2440711e986859b84bb5bc5f63b3a9987aa498a4019824a7bed622593f6e", 140, ) commandDigest := digest.MustNewDigest( "", remoteexecution.DigestFunction_SHA256, - "7b9d720c2fbc4e4c0fc9780208eae45f4d2c4dc23350c53dc1050259827ca459", + "3c169433e9fad318ccf601d327685f95941ce93408fc0d21f92452844564d123", 40, ) @@ -73,7 +82,7 @@ func TestActionCacheAssetStorePutBlob(t *testing.T) { a := m.(*remoteexecution.ActionResult) for _, f := range a.OutputFiles { if f.Path == "out" { - require.True(t, proto.Equal(f.Digest, blobDigest)) + require.True(t, proto.Equal(f.Digest, blobDigest), "Got %v", f.Digest) return nil } } @@ -95,6 +104,12 @@ func TestActionCacheAssetStorePutDirectory(t *testing.T) { Hash: "58de0f27ce0f781e5c109f18b0ee6905bdf64f2b1009e225ac67a27f656a0643", SizeBytes: 111, } + bbRootDirectoryDigest := digest.MustNewDigest( + "", + remoteexecution.DigestFunction_SHA256, + rootDirectoryDigest.Hash, + rootDirectoryDigest.SizeBytes, + ) uri := "https://example.com/example.txt" assetRef := storage.NewAssetReference([]string{uri}, []*remoteasset.Qualifier{{Name: "test", Value: "test"}}) @@ -118,15 +133,25 @@ func TestActionCacheAssetStorePutDirectory(t *testing.T) { actionDigest := digest.MustNewDigest( "", remoteexecution.DigestFunction_SHA256, - "b1b0b54caccd4235e968061f380649a9720ee67909fd1027200230eba93a2427", + "80fa2440711e986859b84bb5bc5f63b3a9987aa498a4019824a7bed622593f6e", 140, ) commandDigest := digest.MustNewDigest( "", remoteexecution.DigestFunction_SHA256, - "7b9d720c2fbc4e4c0fc9780208eae45f4d2c4dc23350c53dc1050259827ca459", + "3c169433e9fad318ccf601d327685f95941ce93408fc0d21f92452844564d123", 40, ) + bbTreeDigest := digest.MustNewDigest( + "", + remoteexecution.DigestFunction_SHA256, + "102b51b9765a56a3e899f7cf0ee38e5251f9c503b357b330a49183eb7b155604", + 2, + ) + treeDigest := &remoteexecution.Digest{ + Hash: "102b51b9765a56a3e899f7cf0ee38e5251f9c503b357b330a49183eb7b155604", + SizeBytes: 2, + } ac := mock.NewMockBlobAccess(ctrl) cas := mock.NewMockBlobAccess(ctrl) @@ -134,6 +159,12 @@ func TestActionCacheAssetStorePutDirectory(t *testing.T) { cas.EXPECT().Put(ctx, directoryDigest, gomock.Any()).Return(nil) cas.EXPECT().Put(ctx, actionDigest, gomock.Any()).Return(nil) cas.EXPECT().Put(ctx, commandDigest, gomock.Any()).Return(nil) + + cas.EXPECT().Get(ctx, bbRootDirectoryDigest).Return( + buffer.NewProtoBufferFromProto(&remoteexecution.Directory{}, + buffer.UserProvided)) + cas.EXPECT().Put(ctx, bbTreeDigest, gomock.Any()).Return(nil) + ac.EXPECT().Put(ctx, actionDigest, gomock.Any()).DoAndReturn( func(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { m, err := b.ToProto(&remoteexecution.ActionResult{}, 1000) @@ -141,7 +172,8 @@ func TestActionCacheAssetStorePutDirectory(t *testing.T) { a := m.(*remoteexecution.ActionResult) for _, d := range a.OutputDirectories { if d.Path == "out" { - require.True(t, proto.Equal(d.RootDirectoryDigest, rootDirectoryDigest)) + require.True(t, proto.Equal(d.TreeDigest, treeDigest), "Got %v", d.TreeDigest) + require.True(t, proto.Equal(d.RootDirectoryDigest, rootDirectoryDigest), "Got %v", d.RootDirectoryDigest) return nil } } @@ -163,6 +195,12 @@ func TestActionCacheAssetStorePutMalformedDirectory(t *testing.T) { Hash: "58de0f27ce0f781e5c109f18b0ee6905bdf64f2b1009e225ac67a27f656a0643", SizeBytes: 111, } + bbRootDirectoryDigest := digest.MustNewDigest( + "", + remoteexecution.DigestFunction_SHA256, + rootDirectoryDigest.Hash, + rootDirectoryDigest.SizeBytes, + ) uri := "https://example.com/example.txt" assetRef := storage.NewAssetReference([]string{uri}, []*remoteasset.Qualifier{{Name: "test", Value: "test"}}) @@ -171,45 +209,171 @@ func TestActionCacheAssetStorePutMalformedDirectory(t *testing.T) { asset.Asset_DIRECTORY, timestamppb.Now(), ) - refDigest := digest.MustNewDigest( - "", - remoteexecution.DigestFunction_SHA256, - "a2c2b32a289d4d9bf6e6309ed2691b6bcc04ee7923fcfd81bf1bfe0e7348139b", - 14, + + ac := mock.NewMockBlobAccess(ctrl) + cas := mock.NewMockBlobAccess(ctrl) + cas.EXPECT().Put(ctx, gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + + cas.EXPECT().Get(ctx, bbRootDirectoryDigest).Return( + buffer.NewProtoBufferFromProto(&remoteexecution.Directory{ + Directories: []*remoteexecution.DirectoryNode{{ + Name: "this is a malformed directory noe", + Digest: nil, + }}, + }, + buffer.UserProvided)) + + assetStore := storage.NewActionCacheAssetStore(ac, cas, 16*1024*1024) + + err = assetStore.Put(ctx, assetRef, assetData, instanceName) + require.NotNil(t, err) +} + +func TestActionCacheAssetStorePutRecursiveDirectory(t *testing.T) { + ctrl, ctx := gomock.WithContext(context.Background(), t) + + instanceName, err := digest.NewInstanceName("") + require.NoError(t, err) + + sub1Digest := &remoteexecution.Digest{ + Hash: "94a72d7ae68d937c7d65ccc7310a97a11ce78a48850ff618fcbeba58c354e07d", + SizeBytes: 40, + } + sub2Digest := &remoteexecution.Digest{ + Hash: "1dc3fa2e0703bb64c17a3b0b4402c44a666ec8ac361e77bb526a65dea6d73bf0", + SizeBytes: 0, + } + + tree := &remoteexecution.Tree{ + Root: &remoteexecution.Directory{ + Directories: []*remoteexecution.DirectoryNode{ + { + Name: "sub1", + Digest: sub1Digest, + }, + { + Name: "sub2", + Digest: sub2Digest, + }, + }, + Files: []*remoteexecution.FileNode{ + { + Digest: &remoteexecution.Digest{ + Hash: "593dd41a19cddee5a67a5bcde0d2323199cc340fa64d6c24a22c5913960a6de2", + SizeBytes: 6, + }, + }, + }, + }, + Children: []*remoteexecution.Directory{ + { + Files: []*remoteexecution.FileNode{ + { + Digest: &remoteexecution.Digest{ + Hash: "12ca9a458e433b707f72d00c2aa659529fee2b9e97de9fe645281b3a13ac6ee9", + SizeBytes: 5, + }, + }, + }, + }, + {}, + }, + } + + rootDirectoryDigest, err := storage.ProtoToDigest(tree.Root) + require.NoError(t, err) + treeDigest, err := storage.ProtoToDigest(tree) + require.NoError(t, err) + + t.Logf("rootDirectoryDigest %v", rootDirectoryDigest) + t.Logf("treeDigest %v", treeDigest) + + uri := "https://example.com/example.txt" + assetRef := storage.NewAssetReference([]string{uri}, + []*remoteasset.Qualifier{{Name: "test", Value: "test"}}) + assetData := storage.NewAsset( + rootDirectoryDigest, + asset.Asset_DIRECTORY, + timestamppb.Now(), ) - directoryDigest := digest.MustNewDigest( - "", - remoteexecution.DigestFunction_SHA256, - "c72e5e1e6ab54746d4fd3da7b443037187c81347a210d2ab8e5863638fbe1ac6", - 88, + + ac := mock.NewMockBlobAccess(ctrl) + cas := mock.NewMockBlobAccess(ctrl) + cas.EXPECT().Put(ctx, gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + + cas.EXPECT().Get(ctx, bbDigest(rootDirectoryDigest)).Return( + buffer.NewProtoBufferFromProto( + tree.Root, + buffer.UserProvided, + )) + cas.EXPECT().Get(ctx, bbDigest(sub1Digest)).Return( + buffer.NewProtoBufferFromProto( + tree.Children[0], + buffer.UserProvided, + )) + cas.EXPECT().Get(ctx, bbDigest(sub2Digest)).Return( + buffer.NewProtoBufferFromProto( + tree.Children[1], + buffer.UserProvided, + )) + + ac.EXPECT().Put(ctx, gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { + m, err := b.ToProto(&remoteexecution.ActionResult{}, 1000) + require.NoError(t, err) + a := m.(*remoteexecution.ActionResult) + for _, d := range a.OutputDirectories { + if d.Path == "out" { + require.True(t, proto.Equal(d.TreeDigest, treeDigest), "Got %v", d.TreeDigest) + require.True(t, proto.Equal(d.RootDirectoryDigest, rootDirectoryDigest), "Got %v", d.RootDirectoryDigest) + return nil + } + } + return status.Error(codes.Internal, "Directory digest not found") + }) + + assetStore := storage.NewActionCacheAssetStore(ac, cas, 16*1024*1024) + + err = assetStore.Put(ctx, assetRef, assetData, instanceName) + require.NoError(t, err) +} + +func TestActionCacheAssetStorePutMalformedDirectoryAsBlob(t *testing.T) { + ctrl, ctx := gomock.WithContext(context.Background(), t) + + instanceName, err := digest.NewInstanceName("") + require.NoError(t, err) + + blobDigest := &remoteexecution.Digest{ + Hash: "58de0f27ce0f781e5c109f18b0ee6905bdf64f2b1009e225ac67a27f656a0643", + SizeBytes: 111, + } + uri := "https://example.com/example.txt" + assetRef := storage.NewAssetReference([]string{uri}, + []*remoteasset.Qualifier{{Name: "test", Value: "test"}}) + assetData := storage.NewAsset( + blobDigest, + asset.Asset_BLOB, + timestamppb.Now(), ) actionDigest := digest.MustNewDigest( "", remoteexecution.DigestFunction_SHA256, - "b1b0b54caccd4235e968061f380649a9720ee67909fd1027200230eba93a2427", + "80fa2440711e986859b84bb5bc5f63b3a9987aa498a4019824a7bed622593f6e", 140, ) - commandDigest := digest.MustNewDigest( - "", - remoteexecution.DigestFunction_SHA256, - "7b9d720c2fbc4e4c0fc9780208eae45f4d2c4dc23350c53dc1050259827ca459", - 40, - ) ac := mock.NewMockBlobAccess(ctrl) cas := mock.NewMockBlobAccess(ctrl) - cas.EXPECT().Put(ctx, refDigest, gomock.Any()).Return(nil) - cas.EXPECT().Put(ctx, directoryDigest, gomock.Any()).Return(nil) - cas.EXPECT().Put(ctx, actionDigest, gomock.Any()).Return(nil) - cas.EXPECT().Put(ctx, commandDigest, gomock.Any()).Return(nil) + cas.EXPECT().Put(ctx, gomock.Any(), gomock.Any()).Return(nil).Times(4) ac.EXPECT().Put(ctx, actionDigest, gomock.Any()).DoAndReturn( func(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { m, err := b.ToProto(&remoteexecution.ActionResult{}, 1000) require.NoError(t, err) a := m.(*remoteexecution.ActionResult) - for _, d := range a.OutputDirectories { + for _, d := range a.OutputFiles { if d.Path == "out" { - require.True(t, proto.Equal(d.RootDirectoryDigest, rootDirectoryDigest)) + require.True(t, proto.Equal(d.Digest, blobDigest), "Got %v", d.Digest) return nil } } @@ -235,6 +399,13 @@ func roundTripTest(t *testing.T, assetRef *asset.AssetReference, assetData *asse cas := mock.NewMockBlobAccess(ctrl) cas.EXPECT().Put(ctx, gomock.Any(), gomock.Any()).AnyTimes() + + if assetData.Type == asset.Asset_DIRECTORY { + cas.EXPECT().Get(ctx, gomock.Any()).Return( + buffer.NewProtoBufferFromProto(&remoteexecution.Directory{}, + buffer.UserProvided)) + } + ac.EXPECT().Put(ctx, gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, digest digest.Digest, b buffer.Buffer) error { actionDigest = digest @@ -332,7 +503,7 @@ func TestActionCacheAssetStoreGetBlob(t *testing.T) { actionDigest := digest.MustNewDigest( "", remoteexecution.DigestFunction_SHA256, - "b990c9040edaddd33b6e7506c23500cb4d9f699c780089aaa46c5d5f9479f74e", + "7bef991fed17d0a31d1ea1b536f2ac865e567e34ebfa2bfc081d3672110b93be", 140, ) @@ -369,7 +540,7 @@ func TestActionCacheAssetStoreGetDirectory(t *testing.T) { actionDigest := digest.MustNewDigest( "", remoteexecution.DigestFunction_SHA256, - "b990c9040edaddd33b6e7506c23500cb4d9f699c780089aaa46c5d5f9479f74e", + "7bef991fed17d0a31d1ea1b536f2ac865e567e34ebfa2bfc081d3672110b93be", 140, ) diff --git a/pkg/storage/asset_reference.go b/pkg/storage/asset_reference.go index a294364..2d94814 100644 --- a/pkg/storage/asset_reference.go +++ b/pkg/storage/asset_reference.go @@ -49,7 +49,7 @@ func assetReferenceToAction(ar *asset.AssetReference, directoryDigest *remoteexe command := &remoteexecution.Command{ Arguments: ar.Uris, OutputPaths: []string{"out"}, - OutputDirectoryFormat: remoteexecution.Command_DIRECTORY_ONLY, + OutputDirectoryFormat: remoteexecution.Command_TREE_AND_DIRECTORY, } commandDigest, err := ProtoToDigest(command) if err != nil {