Skip to content

Commit

Permalink
AC: Treat Directories as blobs
Browse files Browse the repository at this point in the history
We currently do some cleverness to pop Directory objects into the
OutputDirectories field of the ActionResult for semantic purposes.
Unfortunately doing this requires jumping over many hurdles, partially of our
own making and partially from REAPI.

To detect whether the object is a Directory, we fetch it from CAS and attempt to
unmarshal it into a Directory message.  We then must convert it to a Tree to
store in the ActionResult, which also requires the raw proto.

The catch is that things that are not Directory messages may successfully
unmarshal into one -- as a motivating example, BuildStream's Source proto is
such a message.  When this happens, we're very likely to fail the request as we
attempt to use the corrupted Directory we've created.

To solve this, we could pass through data on whether the asset is supposed to be
a directory or not, however doing so is inelegant and has another subtle problem
-- if the client decides to use PushDirectory to push something that isn't a
Directory (which isn't explicitly banned by the spec), then we will error.

As the Action and ActionResults we generate are intended only to be used by us,
we can instead break the semantics somewhat and treat the Directory as an opaque
file, as we do for Blobs.

Note: this intentionally breaks the sharing of Actions should the
RemoteExecutionFetcher be used.  This feels more correct to me, as we were
overwriting the one we actually ran anyway.  Under this usage we instead get
multiple Actions pointing to the same ActionResult -- one for the actually
executed Action, and another for the asset reference.
  • Loading branch information
tomcoldrick-ct authored and harrysarson committed Jul 9, 2024
1 parent d7a9200 commit f0b6649
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 199 deletions.
2 changes: 0 additions & 2 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ go_library(
"@com_github_buildbarn_bb_storage//pkg/blobstore",
"@com_github_buildbarn_bb_storage//pkg/blobstore/buffer",
"@com_github_buildbarn_bb_storage//pkg/digest",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//proto",
"@org_golang_google_protobuf//types/known/timestamppb",
],
Expand Down
162 changes: 13 additions & 149 deletions pkg/storage/action_cache_asset_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/buildbarn/bb-storage/pkg/blobstore"
"github.com/buildbarn/bb-storage/pkg/blobstore/buffer"
"github.com/buildbarn/bb-storage/pkg/digest"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand All @@ -32,59 +30,13 @@ func NewActionCacheAssetStore(actionCache, contentAddressableStorage blobstore.B
}
}

func (rs *actionCacheAssetStore) assetToDirectory(ctx context.Context, asset *asset.Asset, instance digest.InstanceName) (*remoteexecution.Directory, error) {
digestFunction, err := instance.GetDigestFunction(remoteexecution.DigestFunction_UNKNOWN, len(asset.Digest.GetHash()))
if err != nil {
return nil, err
}
digest, err := digestFunction.NewDigestFromProto(asset.Digest)
if err != nil {
return nil, err
}
directory, err := rs.contentAddressableStorage.Get(ctx, digest).ToProto(&remoteexecution.Directory{}, rs.maximumMessageSizeBytes)
if err != nil {
return nil, err
}
return directory.(*remoteexecution.Directory), nil
}

func (rs *actionCacheAssetStore) actionResultToAsset(ctx context.Context, a *remoteexecution.ActionResult, instance digest.InstanceName) (*asset.Asset, error) {
digest := &remoteexecution.Digest{}
// Check if there is an output directory in the action result
for _, dir := range a.OutputDirectories {
if dir.Path == "out" {
digest = dir.TreeDigest
}
}
// If the required output directory is present
if digest.Hash != "" {
digestFunction, err := instance.GetDigestFunction(remoteexecution.DigestFunction_UNKNOWN, len(digest.Hash))
if err != nil {
return nil, err
}
treeDigest, err := digestFunction.NewDigestFromProto(digest)
if err != nil {
return nil, err
}
// The action result contains a tree digest, but a directory digest
// is needed, so retrieve the tree message and get the digest of the
// root
tree, err := rs.contentAddressableStorage.Get(ctx, treeDigest).ToProto(&remoteexecution.Tree{}, rs.maximumMessageSizeBytes)
if err != nil {
return nil, err
}
root := tree.(*remoteexecution.Tree).Root
digest, err = ProtoToDigest(root)
if err != nil {
return nil, err
}
} else {
// Required output directory is not present, look for required
// output file
for _, file := range a.OutputFiles {
if file.Path == "out" {
digest = file.Digest
}
// Required output directory is not present, look for required
// output file
for _, file := range a.OutputFiles {
if file.Path == "out" {
digest = file.Digest
}
}
return &asset.Asset{
Expand Down Expand Up @@ -204,22 +156,10 @@ func (rs *actionCacheAssetStore) Put(ctx context.Context, ref *asset.AssetRefere
}
var action *remoteexecution.Action
var command *remoteexecution.Command
if commandGenerator, err := qualifier.QualifiersToCommand(ref.Qualifiers); err != nil || len(ref.Uris) > 1 {
// Create the action with the qualifier directory as the input root
action, command, err = assetReferenceToAction(ref, directoryDigest)
if err != nil {
return err
}
} else {
command = commandGenerator(ref.Uris[0])
commandDigest, err := ProtoToDigest(command)
if err != nil {
return err
}
action = &remoteexecution.Action{
CommandDigest: commandDigest,
InputRootDigest: EmptyDigest,
}
// Create the action with the qualifier directory as the input root
action, command, err = assetReferenceToAction(ref, directoryDigest)
if err != nil {
return err
}
actionPb, err := proto.Marshal(action)
if err != nil {
Expand Down Expand Up @@ -256,93 +196,17 @@ func (rs *actionCacheAssetStore) Put(ctx context.Context, ref *asset.AssetRefere
}

actionResult := &remoteexecution.ActionResult{
OutputFiles: []*remoteexecution.OutputFile{{
Path: "out",
Digest: data.Digest,
}},
ExecutionMetadata: &remoteexecution.ExecutedActionMetadata{
QueuedTimestamp: data.LastUpdated,
},
}

// Check if the input asset is a directory or blob
d, err := rs.assetToDirectory(ctx, data, instance)
if err == nil {
// If it is a directory, construct a tree from it as tree digest is
// required for action result
tree, err := rs.directoryToTree(ctx, d, instance)
if err != nil {
return err
}
treePb, err := proto.Marshal(tree)
if err != nil {
return err
}
treeDigest, err := ProtoToDigest(tree)
if err != nil {
return err
}
bbTreeDigest, err := digestFunction.NewDigestFromProto(treeDigest)
if err != nil {
return err
}
err = rs.contentAddressableStorage.Put(ctx, bbTreeDigest, buffer.NewCASBufferFromByteSlice(bbTreeDigest, treePb, buffer.UserProvided))
if err != nil {
return err
}
actionResult.OutputDirectories = []*remoteexecution.OutputDirectory{{
Path: "out",
TreeDigest: treeDigest,
}}
} else {
// If it isn't a directory, use the digest as an output file digest
if status.Code(err) != codes.InvalidArgument {
return err
}
actionResult.OutputFiles = []*remoteexecution.OutputFile{{
Path: "out",
Digest: data.Digest,
}}
}
return rs.actionCache.Put(ctx, bbActionDigest, buffer.NewProtoBufferFromProto(actionResult, buffer.UserProvided))
}

func (rs *actionCacheAssetStore) directoryToTree(ctx context.Context, directory *remoteexecution.Directory, instance digest.InstanceName) (*remoteexecution.Tree, error) {
children := []*remoteexecution.Directory{}
for _, node := range directory.Directories {
nodeChildren, err := rs.directoryNodeToDirectories(ctx, instance, node)
if err != nil {
return nil, err
}
children = append(children, nodeChildren...)
}

return &remoteexecution.Tree{
Root: directory,
Children: children,
}, nil
}

func (rs *actionCacheAssetStore) directoryNodeToDirectories(ctx context.Context, instance digest.InstanceName, node *remoteexecution.DirectoryNode) ([]*remoteexecution.Directory, error) {
digestFunction, err := instance.GetDigestFunction(remoteexecution.DigestFunction_UNKNOWN, len(node.GetDigest().GetHash()))
if err != nil {
return nil, err
}
digest, err := digestFunction.NewDigestFromProto(node.Digest)
if err != nil {
return nil, err
}
directory, err := rs.contentAddressableStorage.Get(ctx, digest).ToProto(&remoteexecution.Directory{}, rs.maximumMessageSizeBytes)
if err != nil {
return nil, err
}
directories := []*remoteexecution.Directory{directory.(*remoteexecution.Directory)}
for _, node := range directory.(*remoteexecution.Directory).Directories {
children, err := rs.directoryNodeToDirectories(ctx, instance, node)
if err != nil {
return nil, err
}
directories = append(directories, children...)
}
return directories, nil
}

func getDefaultTimestamp() *timestamppb.Timestamp {
return timestamppb.New(time.Unix(0, 0))
}
56 changes: 8 additions & 48 deletions pkg/storage/action_cache_asset_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@ func TestActionCacheAssetStorePutBlob(t *testing.T) {
Hash: "58de0f27ce0f781e5c109f18b0ee6905bdf64f2b1009e225ac67a27f656a0643",
SizeBytes: 111,
}
bbBlobDigest := digest.MustNewDigest(
"",
remoteexecution.DigestFunction_SHA256,
blobDigest.Hash,
blobDigest.SizeBytes,
)
uri := "https://example.com/example.txt"
assetRef := storage.NewAssetReference([]string{uri},
[]*remoteasset.Qualifier{{Name: "test", Value: "test"}})
Expand Down Expand Up @@ -70,8 +64,6 @@ func TestActionCacheAssetStorePutBlob(t *testing.T) {
cas.EXPECT().Put(ctx, directoryDigest, gomock.Any()).Return(nil)
cas.EXPECT().Put(ctx, actionDigest, gomock.Any()).Return(nil)
cas.EXPECT().Put(ctx, commandDigest, gomock.Any()).Return(nil)
cas.EXPECT().Get(ctx, bbBlobDigest).Return(
buffer.NewValidatedBufferFromByteSlice([]byte("Hello")))
ac.EXPECT().Put(ctx, actionDigest, gomock.Any()).DoAndReturn(
func(ctx context.Context, digest digest.Digest, b buffer.Buffer) error {
m, err := b.ToProto(&remoteexecution.ActionResult{}, 1000)
Expand Down Expand Up @@ -101,12 +93,6 @@ func TestActionCacheAssetStorePutDirectory(t *testing.T) {
Hash: "58de0f27ce0f781e5c109f18b0ee6905bdf64f2b1009e225ac67a27f656a0643",
SizeBytes: 111,
}
bbRootDirectoryDigest := digest.MustNewDigest(
"",
remoteexecution.DigestFunction_SHA256,
rootDirectoryDigest.Hash,
rootDirectoryDigest.SizeBytes,
)
uri := "https://example.com/example.txt"
assetRef := storage.NewAssetReference([]string{uri},
[]*remoteasset.Qualifier{{Name: "test", Value: "test"}})
Expand Down Expand Up @@ -136,35 +122,21 @@ func TestActionCacheAssetStorePutDirectory(t *testing.T) {
"e6842def39984b212641b9796c162b9e3085da84257bae614418f2255b0addc5",
38,
)
bbTreeDigest := digest.MustNewDigest(
"",
remoteexecution.DigestFunction_SHA256,
"102b51b9765a56a3e899f7cf0ee38e5251f9c503b357b330a49183eb7b155604",
2,
)
treeDigest := &remoteexecution.Digest{
Hash: "102b51b9765a56a3e899f7cf0ee38e5251f9c503b357b330a49183eb7b155604",
SizeBytes: 2,
}

ac := mock.NewMockBlobAccess(ctrl)
cas := mock.NewMockBlobAccess(ctrl)
cas.EXPECT().Put(ctx, refDigest, gomock.Any()).Return(nil)
cas.EXPECT().Put(ctx, directoryDigest, gomock.Any()).Return(nil)
cas.EXPECT().Put(ctx, actionDigest, gomock.Any()).Return(nil)
cas.EXPECT().Put(ctx, commandDigest, gomock.Any()).Return(nil)
cas.EXPECT().Get(ctx, bbRootDirectoryDigest).Return(
buffer.NewProtoBufferFromProto(&remoteexecution.Directory{},
buffer.UserProvided))
cas.EXPECT().Put(ctx, bbTreeDigest, gomock.Any()).Return(nil)
ac.EXPECT().Put(ctx, actionDigest, gomock.Any()).DoAndReturn(
func(ctx context.Context, digest digest.Digest, b buffer.Buffer) error {
m, err := b.ToProto(&remoteexecution.ActionResult{}, 1000)
require.NoError(t, err)
a := m.(*remoteexecution.ActionResult)
for _, d := range a.OutputDirectories {
for _, d := range a.OutputFiles {
if d.Path == "out" {
require.True(t, proto.Equal(d.TreeDigest, treeDigest))
require.True(t, proto.Equal(d.Digest, rootDirectoryDigest))
return nil
}
}
Expand Down Expand Up @@ -222,16 +194,6 @@ func TestActionCacheAssetStoreGetDirectory(t *testing.T) {

instanceName := digest.MustNewInstanceName("")

treeDigest := &remoteexecution.Digest{
Hash: "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f",
SizeBytes: 222,
}
bbTreeDigest := digest.MustNewDigest(
"",
remoteexecution.DigestFunction_SHA256,
"aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f",
222,
)
uri := "https://example.com/example.txt"
assetRef := storage.NewAssetReference([]string{uri},
[]*remoteasset.Qualifier{})
Expand All @@ -244,25 +206,23 @@ func TestActionCacheAssetStoreGetDirectory(t *testing.T) {

ts := timestamppb.New(time.Unix(0, 0))
buf := buffer.NewProtoBufferFromProto(&remoteexecution.ActionResult{
OutputDirectories: []*remoteexecution.OutputDirectory{
OutputFiles: []*remoteexecution.OutputFile{
{
Path: "out",
TreeDigest: treeDigest,
Path: "out",
Digest: &remoteexecution.Digest{
Hash: "aec070645fe53ee3b3763059376134f058cc337247c978add178b6ccdfb0019f",
SizeBytes: 222,
},
},
},
ExecutionMetadata: &remoteexecution.ExecutedActionMetadata{
QueuedTimestamp: ts,
},
}, buffer.UserProvided)

treeBuf := buffer.NewProtoBufferFromProto(&remoteexecution.Tree{
Root: &remoteexecution.Directory{},
}, buffer.UserProvided)

ac := mock.NewMockBlobAccess(ctrl)
cas := mock.NewMockBlobAccess(ctrl)
ac.EXPECT().Get(ctx, actionDigest).Return(buf)
cas.EXPECT().Get(ctx, bbTreeDigest).Return(treeBuf)
assetStore := storage.NewActionCacheAssetStore(ac, cas, 16*1024*1024)

_, err := assetStore.Get(ctx, assetRef, instanceName)
Expand Down

0 comments on commit f0b6649

Please sign in to comment.