Skip to content

Commit

Permalink
Instantiate data sources from the policy engine
Browse files Browse the repository at this point in the history
This hooks data source registration into Minder's policy engine. This is
done by populating individual data source registries into each rule type
engine, which pertains to a single rule type. This way, each rule type
would only have access to the data sources it instantiates.

This also caught a segfault we were unaware of. For some reason, the
builder of the ruletype engine cache was not populating all the needed
members of the struct which would segfault in case the cache did not
have a hit. This fixes that.

Signed-off-by: Juan Antonio Osorio <[email protected]>
  • Loading branch information
JAORMX committed Nov 27, 2024
1 parent 2b44edb commit 58e6ee4
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 8 deletions.
5 changes: 5 additions & 0 deletions internal/datasources/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,11 @@ func (d *dataSourceService) BuildDataSourceRegistry(
instantiations := rt.GetDef().GetEval().GetDataSources()
reg := v1datasources.NewDataSourceRegistry()

// return early so we don't need to do useless work
if len(instantiations) == 0 {
return reg, nil
}

stx, err := d.txBuilder(d, opts)
if err != nil {
return nil, fmt.Errorf("failed to start transaction: %w", err)
Expand Down
4 changes: 4 additions & 0 deletions internal/engine/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/open-feature/go-sdk/openfeature"
"github.com/rs/zerolog"

datasourceservice "github.com/mindersec/minder/internal/datasources/service"
"github.com/mindersec/minder/internal/db"
"github.com/mindersec/minder/internal/engine/actions"
"github.com/mindersec/minder/internal/engine/actions/alert"
Expand Down Expand Up @@ -112,6 +113,8 @@ func (e *executor) EvalEntityEvent(ctx context.Context, inf *entities.EntityInfo

defer e.releaseLockAndFlush(ctx, inf)

dssvc := datasourceservice.NewDataSourceService(e.querier)

entityType := entities.EntityTypeToDB(inf.Type)
// Load all the relevant rule type engines for this entity
ruleEngineCache, err := rtengine.NewRuleEngineCache(
Expand All @@ -121,6 +124,7 @@ func (e *executor) EvalEntityEvent(ctx context.Context, inf *entities.EntityInfo
inf.ProjectID,
provider,
ingestCache,
dssvc,
eoptions.WithFlagsClient(e.featureFlags),
)
if err != nil {
Expand Down
36 changes: 32 additions & 4 deletions internal/engine/rtengine/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/google/uuid"

datasourceservice "github.com/mindersec/minder/internal/datasources/service"
"github.com/mindersec/minder/internal/db"
"github.com/mindersec/minder/internal/engine/ingestcache"
eoptions "github.com/mindersec/minder/internal/engine/options"
Expand All @@ -32,6 +33,7 @@ type ruleEngineCache struct {
provider provinfv1.Provider
ingestCache ingestcache.Cache
engines cacheType
dssvc datasourceservice.DataSourcesService
opts []eoptions.Option
}

Expand All @@ -40,13 +42,15 @@ type ruleEngineCache struct {
// for this entity and project hierarchy.
func NewRuleEngineCache(
ctx context.Context,
store db.Querier,
store db.Store,
entityType db.Entities,
projectID uuid.UUID,
provider provinfv1.Provider,
ingestCache ingestcache.Cache,
dssvc datasourceservice.DataSourcesService,
opts ...eoptions.Option,
) (Cache, error) {

// Get the full project hierarchy
hierarchy, err := store.GetParentProjects(ctx, projectID)
if err != nil {
Expand All @@ -66,14 +70,22 @@ func NewRuleEngineCache(
// Populate the cache with rule type engines for the rule types we found.
engines := make(cacheType, len(ruleTypes))
for _, ruleType := range ruleTypes {
ruleEngine, err := cacheRuleEngine(ctx, &ruleType, provider, ingestCache, engines, opts...)
ruleEngine, err := cacheRuleEngine(
ctx, &ruleType, provider, ingestCache, engines, dssvc, opts...)
if err != nil {
return nil, err
}
engines[ruleType.ID] = ruleEngine
}

return &ruleEngineCache{engines: engines, opts: opts}, nil
return &ruleEngineCache{
store: store,
provider: provider,
ingestCache: ingestCache,
engines: engines,
opts: opts,
dssvc: dssvc,
}, nil
}

func (r *ruleEngineCache) GetRuleEngine(ctx context.Context, ruleTypeID uuid.UUID) (*rtengine2.RuleTypeEngine, error) {
Expand All @@ -100,7 +112,7 @@ func (r *ruleEngineCache) GetRuleEngine(ctx context.Context, ruleTypeID uuid.UUI
}

// If we find the rule type, insert into the cache and return.
ruleTypeEngine, err := cacheRuleEngine(ctx, &ruleType, r.provider, r.ingestCache, r.engines, r.opts...)
ruleTypeEngine, err := cacheRuleEngine(ctx, &ruleType, r.provider, r.ingestCache, r.engines, r.dssvc, r.opts...)
if err != nil {
return nil, fmt.Errorf("error while caching rule type engine: %w", err)
}
Expand All @@ -113,6 +125,7 @@ func cacheRuleEngine(
provider provinfv1.Provider,
ingestCache ingestcache.Cache,
engineCache cacheType,
dssvc datasourceservice.DataSourcesService,
opts ...eoptions.Option,
) (*rtengine2.RuleTypeEngine, error) {
// Parse the rule type
Expand All @@ -121,6 +134,21 @@ func cacheRuleEngine(
return nil, fmt.Errorf("error parsing rule type when parsing rule type %s: %w", ruleType.ID, err)
}

// Build a registry instance per rule type. This allows us to have an
// isolated data source list per instance of the rule type engine which is
// what we want. We don't want rule types using data sources they haven't
// instantiated. It is in this spot that we would add something like a cache
// so data sources could optimize in a per-execution context.
//
// TODO: Do we need to pass in a transaction here?
// TODO: We _might_ want to pass in a slice of the hierarchy here.
dsreg, err := dssvc.BuildDataSourceRegistry(ctx, pbRuleType, nil)
if err != nil {
return nil, fmt.Errorf("error building data source registry: %w", err)
}

opts = append(opts, eoptions.WithDataSources(dsreg))

// Create the rule type engine
ruleEngine, err := rtengine2.NewRuleTypeEngine(ctx, pbRuleType, provider, opts...)
if err != nil {
Expand Down
146 changes: 142 additions & 4 deletions internal/engine/rtengine/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,145 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

mockdssvc "github.com/mindersec/minder/internal/datasources/service/mock"
"github.com/mindersec/minder/internal/db"
dbf "github.com/mindersec/minder/internal/db/fixtures"
"github.com/mindersec/minder/internal/engine/ingestcache"
"github.com/mindersec/minder/internal/providers/testproviders"
v1datasources "github.com/mindersec/minder/pkg/datasources/v1"
rtengine2 "github.com/mindersec/minder/pkg/engine/v1/rtengine"
)

func TestNewRuleTypeEngineCacheConstructor(t *testing.T) {
t.Parallel()

scenarios := []struct {
Name string
DBSetup dbf.DBMockBuilder
DSServiceSetup func(service *mockdssvc.MockDataSourcesService)
ExpectedError string
}{
{
Name: "Returns error when getting parent projects fails",
DBSetup: dbf.NewDBMock(func(mock dbf.DBMock) {
mock.EXPECT().GetParentProjects(gomock.Any(), gomock.Any()).Return(nil, errTest)
}),
ExpectedError: "error getting parent projects",
},
{
Name: "Returns error when getting rule types fails",
DBSetup: dbf.NewDBMock(func(mock dbf.DBMock) {
mock.EXPECT().GetParentProjects(gomock.Any(), gomock.Any()).
Return([]uuid.UUID{uuid.New()}, nil)
mock.EXPECT().GetRuleTypesByEntityInHierarchy(gomock.Any(), gomock.Any()).
Return(nil, errTest)
}),
ExpectedError: "error while retrieving rule types",
},
{
Name: "Returns error when getting rule type with no def",
DBSetup: dbf.NewDBMock(func(mock dbf.DBMock) {
mock.EXPECT().GetParentProjects(gomock.Any(), gomock.Any()).
Return([]uuid.UUID{uuid.New()}, nil)
mock.EXPECT().GetRuleTypesByEntityInHierarchy(gomock.Any(), gomock.Any()).
Return([]db.RuleType{{ID: uuid.New()}}, nil)
}),
ExpectedError: "cannot unmarshal rule type definition",
},
{
Name: "Returns error when building data source registry fails",
DBSetup: dbf.NewDBMock(func(mock dbf.DBMock) {
hierarchy := []uuid.UUID{uuid.New(), uuid.New()}
// Calls from the engine builder itself
mock.EXPECT().GetParentProjects(gomock.Any(), gomock.Any()).
Return(hierarchy, nil)
mock.EXPECT().GetRuleTypesByEntityInHierarchy(gomock.Any(), gomock.Any()).
Return([]db.RuleType{{
ID: uuid.New(),
ProjectID: hierarchy[0],
Definition: []byte(ruleDefJSON),
}}, nil)
}),
DSServiceSetup: func(service *mockdssvc.MockDataSourcesService) {
service.EXPECT().BuildDataSourceRegistry(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, errTest)
},
ExpectedError: errTest.Error(),
},
{
Name: "Creates rule engine cache",
DBSetup: dbf.NewDBMock(func(mock dbf.DBMock) {
hierarchy := []uuid.UUID{uuid.New(), uuid.New()}
// Calls from the engine builder itself
mock.EXPECT().GetParentProjects(gomock.Any(), gomock.Any()).
Return(hierarchy, nil)
mock.EXPECT().GetRuleTypesByEntityInHierarchy(gomock.Any(), gomock.Any()).
Return([]db.RuleType{{
ID: uuid.New(),
ProjectID: hierarchy[0],
Definition: []byte(ruleDefJSON),
}}, nil)
}),
DSServiceSetup: func(service *mockdssvc.MockDataSourcesService) {
service.EXPECT().BuildDataSourceRegistry(gomock.Any(), gomock.Any(), gomock.Any()).
Return(v1datasources.NewDataSourceRegistry(), nil)
},
},
}

for _, scenario := range scenarios {
t.Run(scenario.Name, func(t *testing.T) {
t.Parallel()

ctrl := gomock.NewController(t)
defer ctrl.Finish()
ctx := context.Background()

dssvc := mockdssvc.NewMockDataSourcesService(ctrl)

var store db.Store
if scenario.DBSetup != nil {
store = scenario.DBSetup(ctrl)
}

if scenario.DSServiceSetup != nil {
scenario.DSServiceSetup(dssvc)
}

cache, err := NewRuleEngineCache(
ctx, store, db.EntitiesRepository, uuid.New(),
testproviders.NewGitProvider(nil), ingestcache.NewNoopCache(),
dssvc)
if scenario.ExpectedError != "" {
require.ErrorContains(t, err, scenario.ExpectedError)
require.Nil(t, cache)
} else {
require.NoError(t, err)
require.NotNil(t, cache)

// Ensure members are not null so we don't fall on the same issue
// we had of not initializing them.
impl, ok := cache.(*ruleEngineCache)
require.True(t, ok)
require.NotNil(t, impl.store)
require.NotNil(t, impl.provider)
require.NotNil(t, impl.ingestCache)
require.NotNil(t, impl.engines)
require.NotNil(t, impl.dssvc)
}
})
}
}

func TestGetRuleEngine(t *testing.T) {
t.Parallel()

scenarios := []struct {
Name string
Cache cacheType
DBSetup dbf.DBMockBuilder
ExpectedError string
Name string
Cache cacheType
DBSetup dbf.DBMockBuilder
ExpectedError string
dsRegistryError error
}{
{
Name: "Retrieves rule engine from cache",
Expand Down Expand Up @@ -62,6 +186,13 @@ func TestGetRuleEngine(t *testing.T) {
Cache: cacheType{},
DBSetup: dbf.NewDBMock(withRuleTypeLookup(&ruleType, nil)),
},
{
Name: "Returns error when building data source registry fails",
Cache: cacheType{},
DBSetup: dbf.NewDBMock(withRuleTypeLookup(&ruleType, nil)),
dsRegistryError: errTest,
ExpectedError: errTest.Error(),
},
}

for _, scenario := range scenarios {
Expand All @@ -77,11 +208,18 @@ func TestGetRuleEngine(t *testing.T) {
store = scenario.DBSetup(ctrl)
}

dssvc := mockdssvc.NewMockDataSourcesService(ctrl)
reg := v1datasources.NewDataSourceRegistry()

dssvc.EXPECT().BuildDataSourceRegistry(gomock.Any(), gomock.Any(), gomock.Any()).
Return(reg, scenario.dsRegistryError).AnyTimes()

cache := ruleEngineCache{
store: store,
provider: testproviders.NewGitProvider(nil),
ingestCache: ingestcache.NewNoopCache(),
engines: scenario.Cache,
dssvc: dssvc,
}

result, err := cache.GetRuleEngine(ctx, ruleTypeID)
Expand Down

0 comments on commit 58e6ee4

Please sign in to comment.