Skip to content

Commit

Permalink
Enrich repositories in graveler with storage ID
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-har committed Feb 26, 2025
1 parent 2ec77bd commit 364c903
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 23 deletions.
5 changes: 4 additions & 1 deletion pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {

storeLimiter := kv.NewStoreLimiter(cfg.KVStore, limiter)
addressProvider := ident.NewHexAddressProvider()

refManager := ref.NewRefManager(
ref.ManagerConfig{
Executor: executor,
Expand All @@ -363,7 +364,9 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {
CommitCacheConfig: ref.CacheConfig(baseCfg.Graveler.CommitCache),
MaxBatchDelay: baseCfg.Graveler.MaxBatchDelay,
BranchApproximateOwnershipParams: makeBranchApproximateOwnershipParams(baseCfg.Graveler.BranchOwnership),
})
},
cfg.Config.StorageConfig(),
)
gcManager := retention.NewGarbageCollectionManager(tierFSParams.Adapter, refManager, baseCfg.Committed.BlockStoragePrefix)
settingManager := settings.NewManager(refManager, cfg.KVStore)
if cfg.SettingsManagerOption != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ type AdapterConfig interface {
BlockstoreAzureParams() (blockparams.Azure, error)
GetDefaultNamespacePrefix() *string
IsBackwardsCompatible() bool
ID() string
}

type Blockstore struct {
Expand Down Expand Up @@ -357,6 +358,10 @@ func (b *Blockstore) IsBackwardsCompatible() bool {
return false
}

func (b *Blockstore) ID() string {
return SingleBlockstoreID
}

func (b *Blockstore) SigningKey() SecureString {
return b.Signing.SecretKey
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/graveler/ref/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/treeverse/lakefs/pkg/batch"
"github.com/treeverse/lakefs/pkg/config"
"github.com/treeverse/lakefs/pkg/graveler"
"github.com/treeverse/lakefs/pkg/graveler/ref"
"github.com/treeverse/lakefs/pkg/ident"
Expand Down Expand Up @@ -41,7 +42,7 @@ func testRefManager(t testing.TB) (graveler.RefManager, kv.Store) {
RepositoryCacheConfig: testRepoCacheConfig,
CommitCacheConfig: testCommitCacheConfig,
}
return ref.NewRefManager(cfg), kvStore
return ref.NewRefManager(cfg, NewStorageConfigMock(config.SingleBlockstoreID)), kvStore
}

func testRefManagerWithAddressProvider(t testing.TB, addressProvider ident.AddressProvider) (graveler.RefManager, kv.Store) {
Expand All @@ -55,7 +56,7 @@ func testRefManagerWithAddressProvider(t testing.TB, addressProvider ident.Addre
RepositoryCacheConfig: testRepoCacheConfig,
CommitCacheConfig: testCommitCacheConfig,
}
return ref.NewRefManager(cfg), kvStore
return ref.NewRefManager(cfg, NewStorageConfigMock(config.SingleBlockstoreID)), kvStore
}

func TestMain(m *testing.M) {
Expand Down
14 changes: 12 additions & 2 deletions pkg/graveler/ref/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/treeverse/lakefs/pkg/batch"
"github.com/treeverse/lakefs/pkg/cache"
"github.com/treeverse/lakefs/pkg/config"
"github.com/treeverse/lakefs/pkg/distributed"
"github.com/treeverse/lakefs/pkg/graveler"
"github.com/treeverse/lakefs/pkg/httputil"
Expand Down Expand Up @@ -40,6 +41,7 @@ type Manager struct {
commitCache cache.Cache
maxBatchDelay time.Duration
branchOwnership *distributed.MostlyCorrectOwner
storageConfig config.StorageConfig
}

func branchFromProto(pb *graveler.BranchData) *graveler.Branch {
Expand Down Expand Up @@ -107,7 +109,7 @@ type ManagerConfig struct {
BranchApproximateOwnershipParams BranchApproximateOwnershipParams
}

func NewRefManager(cfg ManagerConfig) *Manager {
func NewRefManager(cfg ManagerConfig, storageCfg config.StorageConfig) *Manager {
var branchOwnership *distributed.MostlyCorrectOwner
if cfg.BranchApproximateOwnershipParams.RefreshInterval > 0 {
log := logging.ContextUnavailable().WithField("component", "RefManager approximate branch ownership")
Expand All @@ -130,6 +132,7 @@ func NewRefManager(cfg ManagerConfig) *Manager {
commitCache: newCache(cfg.CommitCacheConfig),
maxBatchDelay: cfg.MaxBatchDelay,
branchOwnership: branchOwnership,
storageConfig: storageCfg,
}
}

Expand All @@ -142,6 +145,13 @@ func (m *Manager) getRepository(ctx context.Context, repositoryID graveler.Repos
}
return nil, err
}
repo := graveler.RepoFromProto(&data)
if repo.StorageID == config.SingleBlockstoreID {
if storage := m.storageConfig.GetStorageByID(config.SingleBlockstoreID); storage != nil {
repo.StorageID = graveler.StorageID(storage.ID()) // Will return the real actual ID
}
}

return graveler.RepoFromProto(&data), nil
}

Expand Down Expand Up @@ -232,7 +242,7 @@ func (m *Manager) CreateBareRepository(ctx context.Context, repositoryID gravele
}

func (m *Manager) ListRepositories(ctx context.Context) (graveler.RepositoryIterator, error) {
return NewRepositoryIterator(ctx, m.kvStore)
return NewRepositoryIterator(ctx, m.kvStore, m.storageConfig)
}

func (m *Manager) updateRepoState(ctx context.Context, repo *graveler.RepositoryRecord, state graveler.RepositoryState) error {
Expand Down
42 changes: 40 additions & 2 deletions pkg/graveler/ref/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/treeverse/lakefs/pkg/batch"
"github.com/treeverse/lakefs/pkg/config"
"github.com/treeverse/lakefs/pkg/graveler"
"github.com/treeverse/lakefs/pkg/graveler/ref"
"github.com/treeverse/lakefs/pkg/ident"
Expand All @@ -27,6 +28,42 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

// Mock AdapterConfig only ID method returns the id field
type AdapterConfigMock struct {
config.AdapterConfig
id string
}

func (s *AdapterConfigMock) ID() string {
return s.id
}

// Mock Store only GetStorageByID method returns
// for SingleBlockstoreID AdapterConfig that return the bcID for the ID method
// for any other AdapterConfig that return the storageID for the ID method
type storeMock struct {
config.StorageConfig
bcID string
t *testing.T
}

func (s *storeMock) GetStorageByID(storageID string) config.AdapterConfig {
if storageID == config.SingleBlockstoreID {
return &AdapterConfigMock{
id: s.bcID,
}
}
return &AdapterConfigMock{
id: storageID,
}
}

func NewStorageConfigMock(bcID string) config.StorageConfig {
return &storeMock{
bcID: bcID,
}
}

// TestManager_GetRepositoryCache test get repository information while using cache. Match the number of times we
// call get repository vs number of times we fetch the data.
func TestManager_GetRepositoryCache(t *testing.T) {
Expand All @@ -50,7 +87,8 @@ func TestManager_GetRepositoryCache(t *testing.T) {
RepositoryCacheConfig: cacheConfig,
CommitCacheConfig: cacheConfig,
}
refManager := ref.NewRefManager(cfg)

refManager := ref.NewRefManager(cfg, NewStorageConfigMock(config.SingleBlockstoreID))
for i := 0; i < calls; i++ {
_, err := refManager.GetRepository(ctx, "repo1")
if err != nil {
Expand Down Expand Up @@ -96,7 +134,7 @@ func TestManager_GetCommitCache(t *testing.T) {
RepositoryCacheConfig: cacheConfig,
CommitCacheConfig: cacheConfig,
}
refManager := ref.NewRefManager(cfg)
refManager := ref.NewRefManager(cfg, NewStorageConfigMock(config.SingleBlockstoreID))
for i := 0; i < calls; i++ {
_, err := refManager.GetCommit(ctx, &graveler.RepositoryRecord{
RepositoryID: repoID,
Expand Down
34 changes: 23 additions & 11 deletions pkg/graveler/ref/repository_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,40 @@ import (
"context"
"errors"

"github.com/treeverse/lakefs/pkg/config"
"github.com/treeverse/lakefs/pkg/graveler"
"github.com/treeverse/lakefs/pkg/kv"
)

func updateStorageID(repo *graveler.RepositoryRecord, storageConfig config.StorageConfig) {
if repo.StorageID == config.SingleBlockstoreID {
if storage := storageConfig.GetStorageByID(config.SingleBlockstoreID); storage != nil {
repo.StorageID = graveler.StorageID(storage.ID()) // Will return the real actual ID
}
}
}

type RepositoryIterator struct {
ctx context.Context
it kv.MessageIterator
err error
value *graveler.RepositoryRecord
store kv.Store
closed bool
ctx context.Context
storageConfig config.StorageConfig
it kv.MessageIterator
err error
value *graveler.RepositoryRecord
store kv.Store
closed bool
}

func NewRepositoryIterator(ctx context.Context, store kv.Store) (*RepositoryIterator, error) {
func NewRepositoryIterator(ctx context.Context, store kv.Store, storageConfig config.StorageConfig) (*RepositoryIterator, error) {
it, err := kv.NewPrimaryIterator(ctx, store, (&graveler.RepositoryData{}).ProtoReflect().Type(), graveler.RepositoriesPartition(), []byte(graveler.RepoPath("")), kv.IteratorOptionsAfter([]byte{}))
if err != nil {
return nil, err
}
return &RepositoryIterator{
ctx: ctx,
it: it,
store: store,
closed: false,
ctx: ctx,
storageConfig: storageConfig,
it: it,
store: store,
closed: false,
}, nil
}

Expand All @@ -52,6 +63,7 @@ func (ri *RepositoryIterator) Next() bool {
}

ri.value = graveler.RepoFromProto(repo)
updateStorageID(ri.value, ri.storageConfig)
return true
}

Expand Down
71 changes: 66 additions & 5 deletions pkg/graveler/ref/repository_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ref_test

import (
"context"
"github.com/treeverse/lakefs/pkg/config"
"testing"
"time"

Expand All @@ -14,6 +15,66 @@ import (
"github.com/treeverse/lakefs/pkg/testutil"
)

func TestStorageIDForRepositoryIterator(t *testing.T) {
tt := []struct {
name string
repoNames []string
storageIDs []string
bcStorageID string
expectedStorageIDs []string
}{
{
name: "no storage id",
repoNames: []string{"a", "b", "c"},
bcStorageID: "foo",
storageIDs: []string{"", "", ""},
expectedStorageIDs: []string{"foo", "foo", "foo"},
},
{
name: "only storage id",
repoNames: []string{"a", "b", "c"},
storageIDs: []string{"bar", "baz", "qux"},
bcStorageID: "foo",
expectedStorageIDs: []string{"bar", "baz", "qux"},
},
{
name: "mixed storage id",
repoNames: []string{"a", "b", "c"},
storageIDs: []string{"bar", "", "qux"},
bcStorageID: "foo",
expectedStorageIDs: []string{"bar", "foo", "qux"},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
r, store := testRefManager(t)
repos := tc.repoNames
// prepare data
for i, repoId := range repos {
_, err := r.CreateRepository(context.Background(), graveler.RepositoryID(repoId), graveler.Repository{
StorageNamespace: "s3://foo",
CreationDate: time.Now(),
DefaultBranchID: "main",
StorageID: graveler.StorageID(tc.storageIDs[i]),
})
testutil.Must(t, err)
}
iterator, err := ref.NewRepositoryIterator(context.Background(), store, &storeMock{bcID: tc.bcStorageID, t: t})
require.NoError(t, err)
for expected := range tc.expectedStorageIDs {
if !iterator.Next() {
t.Fatalf("expected to have more repos")
}
repo := iterator.Value()
if repo.StorageID.String() != tc.expectedStorageIDs[expected] {
t.Fatalf("expected storage ID %s, got %s", tc.expectedStorageIDs[expected], repo.StorageID)
}
}
})

}
}

func TestRepositoryIterator(t *testing.T) {
r, store := testRefManager(t)
repos := []graveler.RepositoryID{"a", "aa", "b", "c", "e", "d", "f"}
Expand All @@ -29,7 +90,7 @@ func TestRepositoryIterator(t *testing.T) {
}

t.Run("listing all repos", func(t *testing.T) {
iter, err := ref.NewRepositoryIterator(context.Background(), store)
iter, err := ref.NewRepositoryIterator(context.Background(), store, NewStorageConfigMock(config.SingleBlockstoreID))
require.NoError(t, err)
repoIds := make([]graveler.RepositoryID, 0)
for iter.Next() {
Expand All @@ -47,7 +108,7 @@ func TestRepositoryIterator(t *testing.T) {
})

t.Run("listing repos from prefix", func(t *testing.T) {
iter, err := ref.NewRepositoryIterator(context.Background(), store)
iter, err := ref.NewRepositoryIterator(context.Background(), store, NewStorageConfigMock(config.SingleBlockstoreID))
require.NoError(t, err)
iter.SeekGE("b")
repoIds := make([]graveler.RepositoryID, 0)
Expand All @@ -66,7 +127,7 @@ func TestRepositoryIterator(t *testing.T) {
})

t.Run("listing repos SeekGE", func(t *testing.T) {
iter, err := ref.NewRepositoryIterator(context.Background(), store)
iter, err := ref.NewRepositoryIterator(context.Background(), store, NewStorageConfigMock(config.SingleBlockstoreID))
require.NoError(t, err)
iter.SeekGE("b")
repoIds := make([]graveler.RepositoryID, 0)
Expand Down Expand Up @@ -108,7 +169,7 @@ func TestRepositoryIterator_CloseTwice(t *testing.T) {
entIt.EXPECT().Close().Times(1)
store := mock.NewMockStore(ctrl)
store.EXPECT().Scan(ctx, gomock.Any(), gomock.Any()).Return(entIt, nil).Times(1)
it, err := ref.NewRepositoryIterator(ctx, store)
it, err := ref.NewRepositoryIterator(ctx, store, nil)
if err != nil {
t.Fatal("NewRepositoryIterator failed", err)
}
Expand All @@ -124,7 +185,7 @@ func TestRepositoryIterator_NextClosed(t *testing.T) {
entIt.EXPECT().Close().Times(1)
store := mock.NewMockStore(ctrl)
store.EXPECT().Scan(ctx, gomock.Any(), gomock.Any()).Return(entIt, nil).Times(1)
it, err := ref.NewRepositoryIterator(ctx, store)
it, err := ref.NewRepositoryIterator(ctx, store, nil)
if err != nil {
t.Fatal("NewRepositoryIterator failed", err)
}
Expand Down

0 comments on commit 364c903

Please sign in to comment.