Skip to content

Commit

Permalink
DiscoveryConfigStatus: update even when no resource is found (#50433) (
Browse files Browse the repository at this point in the history
…#50766)

* DiscoveryConfigStatus: update even when no resource is found

During Auto Discover, when using a DiscoveryConfig, if no resources are
found, the DiscoveryConfigStatus is not updated accordingly.

This PR ensures that, even when no resources are found, the status will
report so.

* use comparable instead of any for generic method

* remove useless un-named function on PreFetchHooks

* prevent call to ssm:SendCommand with 0 instances

* rename var from ok to found
  • Loading branch information
marcoandredinis authored Jan 7, 2025
1 parent d7fd22d commit 083798c
Show file tree
Hide file tree
Showing 12 changed files with 221 additions and 57 deletions.
46 changes: 33 additions & 13 deletions lib/srv/discovery/database_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,7 @@ func (s *Server) startDatabaseWatchers() error {
TriggerFetchC: s.newDiscoveryConfigChangedSub(),
Origin: types.OriginCloud,
Clock: s.clock,
PreFetchHookFn: func() {
discoveryConfigs := slices.FilterMapUnique(
s.getAllDatabaseFetchers(),
func(f common.Fetcher) (s string, include bool) {
return f.GetDiscoveryConfigName(), f.GetDiscoveryConfigName() != ""
},
)
s.updateDiscoveryConfigStatus(discoveryConfigs...)

s.awsRDSResourcesStatus.reset()
},
PreFetchHookFn: s.databaseWatcherIterationStarted,
},
)
if err != nil {
Expand Down Expand Up @@ -151,6 +141,38 @@ func (s *Server) startDatabaseWatchers() error {
return nil
}

func (s *Server) databaseWatcherIterationStarted() {
allFetchers := s.getAllDatabaseFetchers()
if len(allFetchers) == 0 {
return
}

s.submitFetchersEvent(allFetchers)

awsResultGroups := slices.FilterMapUnique(
allFetchers,
func(f common.Fetcher) (awsResourceGroup, bool) {
include := f.GetDiscoveryConfigName() != "" && f.IntegrationName() != ""
resourceGroup := awsResourceGroup{
discoveryConfigName: f.GetDiscoveryConfigName(),
integration: f.IntegrationName(),
}
return resourceGroup, include
},
)

for _, g := range awsResultGroups {
s.awsRDSResourcesStatus.iterationStarted(g)
}

discoveryConfigs := slices.FilterMapUnique(awsResultGroups, func(g awsResourceGroup) (s string, include bool) {
return g.discoveryConfigName, true
})
s.updateDiscoveryConfigStatus(discoveryConfigs...)

s.awsRDSResourcesStatus.reset()
}

func (s *Server) getAllDatabaseFetchers() []common.Fetcher {
allFetchers := make([]common.Fetcher, 0, len(s.databaseFetchers))

Expand All @@ -162,8 +184,6 @@ func (s *Server) getAllDatabaseFetchers() []common.Fetcher {

allFetchers = append(allFetchers, s.databaseFetchers...)

s.submitFetchersEvent(allFetchers)

return allFetchers
}

Expand Down
49 changes: 33 additions & 16 deletions lib/srv/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,18 +482,7 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error {
s.ctx, s.getAllAWSServerFetchers, s.caRotationCh,
server.WithPollInterval(s.PollInterval),
server.WithTriggerFetchC(s.newDiscoveryConfigChangedSub()),
server.WithPreFetchHookFn(func() {
discoveryConfigs := libslices.FilterMapUnique(
s.getAllAWSServerFetchers(),
func(f server.Fetcher) (s string, include bool) {
return f.GetDiscoveryConfigName(), f.GetDiscoveryConfigName() != ""
},
)
s.updateDiscoveryConfigStatus(discoveryConfigs...)

s.awsEC2ResourcesStatus.reset()
s.awsEC2Tasks.reset()
}),
server.WithPreFetchHookFn(s.ec2WatcherIterationStarted),
)
if err != nil {
return trace.Wrap(err)
Expand Down Expand Up @@ -534,6 +523,38 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error {
return nil
}

func (s *Server) ec2WatcherIterationStarted() {
allFetchers := s.getAllAWSServerFetchers()
if len(allFetchers) == 0 {
return
}

s.submitFetchEvent(types.CloudAWS, types.AWSMatcherEC2)

awsResultGroups := libslices.FilterMapUnique(
allFetchers,
func(f server.Fetcher) (awsResourceGroup, bool) {
include := f.GetDiscoveryConfigName() != "" && f.IntegrationName() != ""
resourceGroup := awsResourceGroup{
discoveryConfigName: f.GetDiscoveryConfigName(),
integration: f.IntegrationName(),
}
return resourceGroup, include
},
)
for _, g := range awsResultGroups {
s.awsEC2ResourcesStatus.iterationStarted(g)
}

discoveryConfigs := libslices.FilterMapUnique(awsResultGroups, func(g awsResourceGroup) (s string, include bool) {
return g.discoveryConfigName, true
})
s.updateDiscoveryConfigStatus(discoveryConfigs...)
s.awsEC2ResourcesStatus.reset()

s.awsEC2Tasks.reset()
}

func (s *Server) initKubeAppWatchers(matchers []types.KubernetesMatcher) error {
if len(matchers) == 0 {
return nil
Expand Down Expand Up @@ -1442,10 +1463,6 @@ func (s *Server) getAllAWSServerFetchers() []server.Fetcher {

allFetchers = append(allFetchers, s.staticServerAWSFetchers...)

if len(allFetchers) > 0 {
s.submitFetchEvent(types.CloudAWS, types.AWSMatcherEC2)
}

return allFetchers
}

Expand Down
44 changes: 44 additions & 0 deletions lib/srv/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,26 @@ func TestDiscoveryServer(t *testing.T) {
)
require.NoError(t, err)

dcForEC2StatusWithoutMatchName := uuid.NewString()
dcForEC2StatusWithoutMatch, err := discoveryconfig.NewDiscoveryConfig(
header.Metadata{Name: dcForEC2StatusWithoutMatchName},
discoveryconfig.Spec{
DiscoveryGroup: defaultDiscoveryGroup,
AWS: []types.AWSMatcher{{
Types: []string{"ec2"},
Regions: []string{"eu-central-1"},
Tags: map[string]utils.Strings{"teleport": {"yes"}},
SSM: &types.AWSSSM{DocumentName: "document"},
Params: &types.InstallerParams{
InstallTeleport: true,
EnrollMode: types.InstallParamEnrollMode_INSTALL_PARAM_ENROLL_MODE_SCRIPT,
},
Integration: "my-integration",
}},
},
)
require.NoError(t, err)

discoveryConfigForUserTaskEKSTestName := uuid.NewString()
discoveryConfigForUserTaskEKSTest, err := discoveryconfig.NewDiscoveryConfig(
header.Metadata{Name: discoveryConfigForUserTaskEKSTestName},
Expand Down Expand Up @@ -585,6 +605,30 @@ func TestDiscoveryServer(t *testing.T) {
},
wantInstalledInstances: []string{"instance-id-1"},
},
{
name: "no nodes found using DiscoveryConfig and Integration, but DiscoveryConfig Status is still updated",
presentInstances: []types.Server{},
ssm: &mockSSMClient{},
emitter: &mockEmitter{},
staticMatchers: Matchers{},
discoveryConfig: dcForEC2StatusWithoutMatch,
wantDiscoveryConfigStatus: &discoveryconfig.Status{
State: "DISCOVERY_CONFIG_STATE_SYNCING",
ErrorMessage: nil,
DiscoveredResources: 0,
LastSyncTime: fakeClock.Now().UTC(),
IntegrationDiscoveredResources: map[string]*discoveryconfigv1.IntegrationDiscoveredSummary{
"my-integration": {
AwsEc2: &discoveryconfigv1.ResourcesDiscoveredSummary{
Found: 0,
Enrolled: 0,
Failed: 0,
},
},
},
},
wantInstalledInstances: []string{},
},
{
name: "one node found but SSM Run fails and DiscoverEC2 User Task is created",
presentInstances: []types.Server{},
Expand Down
45 changes: 33 additions & 12 deletions lib/srv/discovery/kube_integration_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,7 @@ func (s *Server) startKubeIntegrationWatchers() error {
Interval: s.PollInterval,
Origin: types.OriginCloud,
TriggerFetchC: s.newDiscoveryConfigChangedSub(),
PreFetchHookFn: func() {
discoveryConfigs := libslices.FilterMapUnique(
s.getKubeIntegrationFetchers(),
func(f common.Fetcher) (s string, include bool) {
return f.GetDiscoveryConfigName(), f.GetDiscoveryConfigName() != ""
},
)
s.updateDiscoveryConfigStatus(discoveryConfigs...)

s.awsEKSResourcesStatus.reset()
s.awsEKSTasks.reset()
},
PreFetchHookFn: s.kubernetesIntegrationWatcherIterationStarted,
})
if err != nil {
return trace.Wrap(err)
Expand Down Expand Up @@ -194,6 +183,38 @@ func (s *Server) startKubeIntegrationWatchers() error {
return nil
}

func (s *Server) kubernetesIntegrationWatcherIterationStarted() {
allFetchers := s.getKubeIntegrationFetchers()
if len(allFetchers) == 0 {
return
}

s.submitFetchersEvent(allFetchers)

awsResultGroups := libslices.FilterMapUnique(
allFetchers,
func(f common.Fetcher) (awsResourceGroup, bool) {
include := f.GetDiscoveryConfigName() != "" && f.IntegrationName() != ""
resourceGroup := awsResourceGroup{
discoveryConfigName: f.GetDiscoveryConfigName(),
integration: f.IntegrationName(),
}
return resourceGroup, include
},
)
for _, g := range awsResultGroups {
s.awsEKSResourcesStatus.iterationStarted(g)
}

discoveryConfigs := libslices.FilterMapUnique(awsResultGroups, func(g awsResourceGroup) (s string, include bool) {
return g.discoveryConfigName, true
})
s.updateDiscoveryConfigStatus(discoveryConfigs...)

s.awsEKSResourcesStatus.reset()
s.awsEKSTasks.reset()
}

func (s *Server) enrollEKSClusters(region, integration, discoveryConfigName string, clusters []types.DiscoveredEKSCluster, agentVersion string, mu *sync.Mutex, enrollingClusters map[string]bool) {
mu.Lock()
for _, c := range clusters {
Expand Down
9 changes: 9 additions & 0 deletions lib/srv/discovery/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,15 @@ func (ars *awsResourcesStatus) incrementFailed(g awsResourceGroup, count int) {
ars.awsResourcesResults[g] = groupStats
}

func (ars *awsResourcesStatus) iterationStarted(g awsResourceGroup) {
ars.mu.Lock()
defer ars.mu.Unlock()
if ars.awsResourcesResults == nil {
ars.awsResourcesResults = make(map[awsResourceGroup]awsResourceGroupResult)
}
ars.awsResourcesResults[g] = awsResourceGroupResult{}
}

func (ars *awsResourcesStatus) incrementFound(g awsResourceGroup, count int) {
ars.mu.Lock()
defer ars.mu.Unlock()
Expand Down
9 changes: 9 additions & 0 deletions lib/srv/server/azure_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type azureFetcherConfig struct {
ResourceGroup string
AzureClientGetter azureClientGetter
DiscoveryConfigName string
Integration string
}

type azureInstanceFetcher struct {
Expand All @@ -133,6 +134,7 @@ type azureInstanceFetcher struct {
Parameters map[string]string
ClientID string
DiscoveryConfigName string
Integration string
}

func newAzureInstanceFetcher(cfg azureFetcherConfig) *azureInstanceFetcher {
Expand All @@ -143,6 +145,7 @@ func newAzureInstanceFetcher(cfg azureFetcherConfig) *azureInstanceFetcher {
ResourceGroup: cfg.ResourceGroup,
Labels: cfg.Matcher.ResourceTags,
DiscoveryConfigName: cfg.DiscoveryConfigName,
Integration: cfg.Integration,
}

if cfg.Matcher.Params != nil {
Expand All @@ -165,6 +168,12 @@ func (f *azureInstanceFetcher) GetDiscoveryConfigName() string {
return f.DiscoveryConfigName
}

// IntegrationName identifies the integration name whose credentials were used to fetch the resources.
// Might be empty when the fetcher is using ambient credentials.
func (f *azureInstanceFetcher) IntegrationName() string {
return f.Integration
}

// GetInstances fetches all Azure virtual machines matching configured filters.
func (f *azureInstanceFetcher) GetInstances(ctx context.Context, _ bool) ([]Instances, error) {
client, err := f.AzureClientGetter.GetAzureVirtualMachinesClient(f.Subscription)
Expand Down
6 changes: 6 additions & 0 deletions lib/srv/server/ec2_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,3 +456,9 @@ func (f *ec2InstanceFetcher) GetInstances(ctx context.Context, rotation bool) ([
func (f *ec2InstanceFetcher) GetDiscoveryConfigName() string {
return f.DiscoveryConfigName
}

// IntegrationName identifies the integration name whose credentials were used to fetch the resources.
// Might be empty when the fetcher is using ambient credentials.
func (f *ec2InstanceFetcher) IntegrationName() string {
return f.Integration
}
22 changes: 16 additions & 6 deletions lib/srv/server/gcp_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type gcpFetcherConfig struct {
GCPClient gcp.InstancesClient
projectsClient gcp.ProjectsClient
DiscoveryConfigName string
Integration string
}

type gcpInstanceFetcher struct {
Expand All @@ -123,16 +124,19 @@ type gcpInstanceFetcher struct {
Parameters map[string]string
projectsClient gcp.ProjectsClient
DiscoveryConfigName string
Integration string
}

func newGCPInstanceFetcher(cfg gcpFetcherConfig) *gcpInstanceFetcher {
fetcher := &gcpInstanceFetcher{
GCP: cfg.GCPClient,
Zones: cfg.Matcher.Locations,
ProjectIDs: cfg.Matcher.ProjectIDs,
ServiceAccounts: cfg.Matcher.ServiceAccounts,
Labels: cfg.Matcher.GetLabels(),
projectsClient: cfg.projectsClient,
GCP: cfg.GCPClient,
Zones: cfg.Matcher.Locations,
ProjectIDs: cfg.Matcher.ProjectIDs,
ServiceAccounts: cfg.Matcher.ServiceAccounts,
Labels: cfg.Matcher.GetLabels(),
projectsClient: cfg.projectsClient,
Integration: cfg.Integration,
DiscoveryConfigName: cfg.DiscoveryConfigName,
}
if cfg.Matcher.Params != nil {
fetcher.Parameters = map[string]string{
Expand All @@ -152,6 +156,12 @@ func (f *gcpInstanceFetcher) GetDiscoveryConfigName() string {
return f.DiscoveryConfigName
}

// IntegrationName identifies the integration name whose credentials were used to fetch the resources.
// Might be empty when the fetcher is using ambient credentials.
func (f *gcpInstanceFetcher) IntegrationName() string {
return f.Integration
}

// GetInstances fetches all GCP virtual machines matching configured filters.
func (f *gcpInstanceFetcher) GetInstances(ctx context.Context, _ bool) ([]Instances, error) {
// Key by project ID, then by zone.
Expand Down
4 changes: 4 additions & 0 deletions lib/srv/server/ssm_install.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ func (si *SSMInstaller) Run(ctx context.Context, req SSMRunRequest) error {
validInstances = instancesState.valid
}

if len(validInstances) == 0 {
return nil
}

validInstanceIDs := instanceIDsFrom(validInstances)
output, err := req.SSM.SendCommandWithContext(ctx, &ssm.SendCommandInput{
DocumentName: aws.String(req.DocumentName),
Expand Down
3 changes: 3 additions & 0 deletions lib/srv/server/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type Fetcher interface {
// GetDiscoveryConfigName returns the DiscoveryConfig name that created this fetcher.
// Empty for Fetchers created from `teleport.yaml/discovery_service.aws.<Matcher>` matchers.
GetDiscoveryConfigName() string
// IntegrationName identifies the integration name whose credentials were used to fetch the resources.
// Might be empty when the fetcher is using ambient credentials.
IntegrationName() string
}

// WithTriggerFetchC sets a poll trigger to manual start a resource polling.
Expand Down
Loading

0 comments on commit 083798c

Please sign in to comment.