From 58e6ee49c15434460693deb9eee77d7d186b0543 Mon Sep 17 00:00:00 2001 From: Juan Antonio Osorio Date: Wed, 27 Nov 2024 10:30:38 +0200 Subject: [PATCH] Instantiate data sources from the policy engine This hooks data source registration into Minder's policy engine. This is done by populating individual data source registries into each rule type engine, which pertains to a single rule type. This way, each rule type would only have access to the data sources it instantiates. This also caught a segfault we were unaware of. For some reason, the builder of the ruletype engine cache was not populating all the needed members of the struct which would segfault in case the cache did not have a hit. This fixes that. Signed-off-by: Juan Antonio Osorio --- internal/datasources/service/service.go | 5 + internal/engine/executor.go | 4 + internal/engine/rtengine/cache.go | 36 +++++- internal/engine/rtengine/cache_test.go | 146 +++++++++++++++++++++++- 4 files changed, 183 insertions(+), 8 deletions(-) diff --git a/internal/datasources/service/service.go b/internal/datasources/service/service.go index 6bf70ca171..cdc4c0bd6b 100644 --- a/internal/datasources/service/service.go +++ b/internal/datasources/service/service.go @@ -291,6 +291,11 @@ func (d *dataSourceService) BuildDataSourceRegistry( instantiations := rt.GetDef().GetEval().GetDataSources() reg := v1datasources.NewDataSourceRegistry() + // return early so we don't need to do useless work + if len(instantiations) == 0 { + return reg, nil + } + stx, err := d.txBuilder(d, opts) if err != nil { return nil, fmt.Errorf("failed to start transaction: %w", err) diff --git a/internal/engine/executor.go b/internal/engine/executor.go index d2b8d68c4e..2007028551 100644 --- a/internal/engine/executor.go +++ b/internal/engine/executor.go @@ -12,6 +12,7 @@ import ( "github.com/open-feature/go-sdk/openfeature" "github.com/rs/zerolog" + datasourceservice "github.com/mindersec/minder/internal/datasources/service" "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/engine/actions" "github.com/mindersec/minder/internal/engine/actions/alert" @@ -112,6 +113,8 @@ func (e *executor) EvalEntityEvent(ctx context.Context, inf *entities.EntityInfo defer e.releaseLockAndFlush(ctx, inf) + dssvc := datasourceservice.NewDataSourceService(e.querier) + entityType := entities.EntityTypeToDB(inf.Type) // Load all the relevant rule type engines for this entity ruleEngineCache, err := rtengine.NewRuleEngineCache( @@ -121,6 +124,7 @@ func (e *executor) EvalEntityEvent(ctx context.Context, inf *entities.EntityInfo inf.ProjectID, provider, ingestCache, + dssvc, eoptions.WithFlagsClient(e.featureFlags), ) if err != nil { diff --git a/internal/engine/rtengine/cache.go b/internal/engine/rtengine/cache.go index 6023c6aad2..cb0fcd5f9e 100644 --- a/internal/engine/rtengine/cache.go +++ b/internal/engine/rtengine/cache.go @@ -11,6 +11,7 @@ import ( "github.com/google/uuid" + datasourceservice "github.com/mindersec/minder/internal/datasources/service" "github.com/mindersec/minder/internal/db" "github.com/mindersec/minder/internal/engine/ingestcache" eoptions "github.com/mindersec/minder/internal/engine/options" @@ -32,6 +33,7 @@ type ruleEngineCache struct { provider provinfv1.Provider ingestCache ingestcache.Cache engines cacheType + dssvc datasourceservice.DataSourcesService opts []eoptions.Option } @@ -40,13 +42,15 @@ type ruleEngineCache struct { // for this entity and project hierarchy. func NewRuleEngineCache( ctx context.Context, - store db.Querier, + store db.Store, entityType db.Entities, projectID uuid.UUID, provider provinfv1.Provider, ingestCache ingestcache.Cache, + dssvc datasourceservice.DataSourcesService, opts ...eoptions.Option, ) (Cache, error) { + // Get the full project hierarchy hierarchy, err := store.GetParentProjects(ctx, projectID) if err != nil { @@ -66,14 +70,22 @@ func NewRuleEngineCache( // Populate the cache with rule type engines for the rule types we found. engines := make(cacheType, len(ruleTypes)) for _, ruleType := range ruleTypes { - ruleEngine, err := cacheRuleEngine(ctx, &ruleType, provider, ingestCache, engines, opts...) + ruleEngine, err := cacheRuleEngine( + ctx, &ruleType, provider, ingestCache, engines, dssvc, opts...) if err != nil { return nil, err } engines[ruleType.ID] = ruleEngine } - return &ruleEngineCache{engines: engines, opts: opts}, nil + return &ruleEngineCache{ + store: store, + provider: provider, + ingestCache: ingestCache, + engines: engines, + opts: opts, + dssvc: dssvc, + }, nil } func (r *ruleEngineCache) GetRuleEngine(ctx context.Context, ruleTypeID uuid.UUID) (*rtengine2.RuleTypeEngine, error) { @@ -100,7 +112,7 @@ func (r *ruleEngineCache) GetRuleEngine(ctx context.Context, ruleTypeID uuid.UUI } // If we find the rule type, insert into the cache and return. - ruleTypeEngine, err := cacheRuleEngine(ctx, &ruleType, r.provider, r.ingestCache, r.engines, r.opts...) + ruleTypeEngine, err := cacheRuleEngine(ctx, &ruleType, r.provider, r.ingestCache, r.engines, r.dssvc, r.opts...) if err != nil { return nil, fmt.Errorf("error while caching rule type engine: %w", err) } @@ -113,6 +125,7 @@ func cacheRuleEngine( provider provinfv1.Provider, ingestCache ingestcache.Cache, engineCache cacheType, + dssvc datasourceservice.DataSourcesService, opts ...eoptions.Option, ) (*rtengine2.RuleTypeEngine, error) { // Parse the rule type @@ -121,6 +134,21 @@ func cacheRuleEngine( return nil, fmt.Errorf("error parsing rule type when parsing rule type %s: %w", ruleType.ID, err) } + // Build a registry instance per rule type. This allows us to have an + // isolated data source list per instance of the rule type engine which is + // what we want. We don't want rule types using data sources they haven't + // instantiated. It is in this spot that we would add something like a cache + // so data sources could optimize in a per-execution context. + // + // TODO: Do we need to pass in a transaction here? + // TODO: We _might_ want to pass in a slice of the hierarchy here. + dsreg, err := dssvc.BuildDataSourceRegistry(ctx, pbRuleType, nil) + if err != nil { + return nil, fmt.Errorf("error building data source registry: %w", err) + } + + opts = append(opts, eoptions.WithDataSources(dsreg)) + // Create the rule type engine ruleEngine, err := rtengine2.NewRuleTypeEngine(ctx, pbRuleType, provider, opts...) if err != nil { diff --git a/internal/engine/rtengine/cache_test.go b/internal/engine/rtengine/cache_test.go index b1338faab7..a492934415 100644 --- a/internal/engine/rtengine/cache_test.go +++ b/internal/engine/rtengine/cache_test.go @@ -13,21 +13,145 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" + mockdssvc "github.com/mindersec/minder/internal/datasources/service/mock" "github.com/mindersec/minder/internal/db" dbf "github.com/mindersec/minder/internal/db/fixtures" "github.com/mindersec/minder/internal/engine/ingestcache" "github.com/mindersec/minder/internal/providers/testproviders" + v1datasources "github.com/mindersec/minder/pkg/datasources/v1" rtengine2 "github.com/mindersec/minder/pkg/engine/v1/rtengine" ) +func TestNewRuleTypeEngineCacheConstructor(t *testing.T) { + t.Parallel() + + scenarios := []struct { + Name string + DBSetup dbf.DBMockBuilder + DSServiceSetup func(service *mockdssvc.MockDataSourcesService) + ExpectedError string + }{ + { + Name: "Returns error when getting parent projects fails", + DBSetup: dbf.NewDBMock(func(mock dbf.DBMock) { + mock.EXPECT().GetParentProjects(gomock.Any(), gomock.Any()).Return(nil, errTest) + }), + ExpectedError: "error getting parent projects", + }, + { + Name: "Returns error when getting rule types fails", + DBSetup: dbf.NewDBMock(func(mock dbf.DBMock) { + mock.EXPECT().GetParentProjects(gomock.Any(), gomock.Any()). + Return([]uuid.UUID{uuid.New()}, nil) + mock.EXPECT().GetRuleTypesByEntityInHierarchy(gomock.Any(), gomock.Any()). + Return(nil, errTest) + }), + ExpectedError: "error while retrieving rule types", + }, + { + Name: "Returns error when getting rule type with no def", + DBSetup: dbf.NewDBMock(func(mock dbf.DBMock) { + mock.EXPECT().GetParentProjects(gomock.Any(), gomock.Any()). + Return([]uuid.UUID{uuid.New()}, nil) + mock.EXPECT().GetRuleTypesByEntityInHierarchy(gomock.Any(), gomock.Any()). + Return([]db.RuleType{{ID: uuid.New()}}, nil) + }), + ExpectedError: "cannot unmarshal rule type definition", + }, + { + Name: "Returns error when building data source registry fails", + DBSetup: dbf.NewDBMock(func(mock dbf.DBMock) { + hierarchy := []uuid.UUID{uuid.New(), uuid.New()} + // Calls from the engine builder itself + mock.EXPECT().GetParentProjects(gomock.Any(), gomock.Any()). + Return(hierarchy, nil) + mock.EXPECT().GetRuleTypesByEntityInHierarchy(gomock.Any(), gomock.Any()). + Return([]db.RuleType{{ + ID: uuid.New(), + ProjectID: hierarchy[0], + Definition: []byte(ruleDefJSON), + }}, nil) + }), + DSServiceSetup: func(service *mockdssvc.MockDataSourcesService) { + service.EXPECT().BuildDataSourceRegistry(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, errTest) + }, + ExpectedError: errTest.Error(), + }, + { + Name: "Creates rule engine cache", + DBSetup: dbf.NewDBMock(func(mock dbf.DBMock) { + hierarchy := []uuid.UUID{uuid.New(), uuid.New()} + // Calls from the engine builder itself + mock.EXPECT().GetParentProjects(gomock.Any(), gomock.Any()). + Return(hierarchy, nil) + mock.EXPECT().GetRuleTypesByEntityInHierarchy(gomock.Any(), gomock.Any()). + Return([]db.RuleType{{ + ID: uuid.New(), + ProjectID: hierarchy[0], + Definition: []byte(ruleDefJSON), + }}, nil) + }), + DSServiceSetup: func(service *mockdssvc.MockDataSourcesService) { + service.EXPECT().BuildDataSourceRegistry(gomock.Any(), gomock.Any(), gomock.Any()). + Return(v1datasources.NewDataSourceRegistry(), nil) + }, + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.Name, func(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + ctx := context.Background() + + dssvc := mockdssvc.NewMockDataSourcesService(ctrl) + + var store db.Store + if scenario.DBSetup != nil { + store = scenario.DBSetup(ctrl) + } + + if scenario.DSServiceSetup != nil { + scenario.DSServiceSetup(dssvc) + } + + cache, err := NewRuleEngineCache( + ctx, store, db.EntitiesRepository, uuid.New(), + testproviders.NewGitProvider(nil), ingestcache.NewNoopCache(), + dssvc) + if scenario.ExpectedError != "" { + require.ErrorContains(t, err, scenario.ExpectedError) + require.Nil(t, cache) + } else { + require.NoError(t, err) + require.NotNil(t, cache) + + // Ensure members are not null so we don't fall on the same issue + // we had of not initializing them. + impl, ok := cache.(*ruleEngineCache) + require.True(t, ok) + require.NotNil(t, impl.store) + require.NotNil(t, impl.provider) + require.NotNil(t, impl.ingestCache) + require.NotNil(t, impl.engines) + require.NotNil(t, impl.dssvc) + } + }) + } +} + func TestGetRuleEngine(t *testing.T) { t.Parallel() scenarios := []struct { - Name string - Cache cacheType - DBSetup dbf.DBMockBuilder - ExpectedError string + Name string + Cache cacheType + DBSetup dbf.DBMockBuilder + ExpectedError string + dsRegistryError error }{ { Name: "Retrieves rule engine from cache", @@ -62,6 +186,13 @@ func TestGetRuleEngine(t *testing.T) { Cache: cacheType{}, DBSetup: dbf.NewDBMock(withRuleTypeLookup(&ruleType, nil)), }, + { + Name: "Returns error when building data source registry fails", + Cache: cacheType{}, + DBSetup: dbf.NewDBMock(withRuleTypeLookup(&ruleType, nil)), + dsRegistryError: errTest, + ExpectedError: errTest.Error(), + }, } for _, scenario := range scenarios { @@ -77,11 +208,18 @@ func TestGetRuleEngine(t *testing.T) { store = scenario.DBSetup(ctrl) } + dssvc := mockdssvc.NewMockDataSourcesService(ctrl) + reg := v1datasources.NewDataSourceRegistry() + + dssvc.EXPECT().BuildDataSourceRegistry(gomock.Any(), gomock.Any(), gomock.Any()). + Return(reg, scenario.dsRegistryError).AnyTimes() + cache := ruleEngineCache{ store: store, provider: testproviders.NewGitProvider(nil), ingestCache: ingestcache.NewNoopCache(), engines: scenario.Cache, + dssvc: dssvc, } result, err := cache.GetRuleEngine(ctx, ruleTypeID)