diff --git a/pkg/component/runtime/manager_test.go b/pkg/component/runtime/manager_test.go index 51bb941bca6..612a85b3eb0 100644 --- a/pkg/component/runtime/manager_test.go +++ b/pkg/component/runtime/manager_test.go @@ -495,7 +495,8 @@ func TestManager_FakeInput_Features(t *testing.T) { func TestManager_FakeInput_APM(t *testing.T) { testPaths(t) - ctx, cancel := context.WithCancel(context.Background()) + timeout := 30 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() agentInfo, _ := info.NewAgentInfo(ctx, true) @@ -550,8 +551,6 @@ func TestManager_FakeInput_APM(t *testing.T) { subscriptionCtx, subCancel := context.WithCancel(context.Background()) defer subCancel() - subscriptionErrCh := make(chan error) - doneCh := make(chan struct{}) initialAPMConfig := &proto.APMConfig{ Elastic: &proto.ElasticAPM{ @@ -581,245 +580,152 @@ func TestManager_FakeInput_APM(t *testing.T) { }, } - go func() { - sub := m.Subscribe(subscriptionCtx, compID) - var healthIteration int - var retrievedApmConfig *proto.APMConfig - for { - select { - case <-subscriptionCtx.Done(): - return - case componentState := <-sub.Ch(): - t.Logf("component state changed: %+v", componentState) + sub := m.Subscribe(subscriptionCtx, compID) - if componentState.State == client.UnitStateFailed { - subscriptionErrCh <- fmt.Errorf("component failed: %s", componentState.Message) - return - } - - unit, ok := componentState.Units[ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-input"}] - if !ok { - subscriptionErrCh <- errors.New("unit missing: fake-input") - return - } - - switch unit.State { - case client.UnitStateFailed: - subscriptionErrCh <- fmt.Errorf("unit failed: %s", unit.Message) - - case client.UnitStateHealthy: - healthIteration++ - t.Logf("Healthy iteration %d starting at %s", healthIteration, time.Now()) - switch healthIteration { - case 1: // yes, it's starting on 1 - comp.Component = &proto.Component{ - ApmConfig: initialAPMConfig, - } - m.Update(component.Model{Components: []component.Component{comp}}) - err := <-m.errCh - if err != nil { - subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", - healthIteration, err) - return - } + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh + require.NoError(t, err, "manager Update call must succeed") + + // testStep tracks how far into the test sequence we've progressed. + // 0: When unit is healthy, set initialAPMConfig + // 1: When initialAPMConfig is active, set modifiedAPMConfig + // 2: When modifiedAPMConfig is active, clear all APMConfig + // 3: When APM config is empty again, succeed + var testStep int +STATELOOP: + for { + select { + case <-ctx.Done(): + require.Fail(t, "timed out waiting for state update") + case componentState := <-sub.Ch(): + t.Logf("component state changed: %+v", componentState) - // check if config sent on iteration 1 was set - case 2: - // In the previous iteration, the (fake) component has received a CheckinExpected - // message to propagate the APM configuration. In this iteration we are about to - // retrieve the APM configuration from the same component via the retrieve_apm_config - // action. Within the component, which is running as a separate process, actions - // and CheckinExpected messages are processed concurrently. We need some way to wait - // a reasonably short amount of time for the CheckinExpected message to be applied by the - // component (thus setting the APM config) before we query the same component - // for apm config information. We accomplish this via assert.Eventually. - // We also send a modified APM config to see that the component updates correctly and - // reports the new config in the next iteration. - assert.Eventuallyf(t, func() bool { - // check the component - res, err := m.PerformAction( - context.Background(), - comp, - comp.Units[0], - fakecmp.ActionRetrieveAPMConfig, - nil) - if err != nil { - subscriptionErrCh <- fmt.Errorf("[case %d]: failed to PerformAction %s: %w", - healthIteration, fakecmp.ActionRetrieveAPMConfig, err) - return false - } - retrievedApmConfig, err = extractAPMConfigFromActionResult(t, res) - if err != nil { - subscriptionErrCh <- fmt.Errorf("[case %d]: failed to retrieve APM Config from ActionResult %s: %w", - healthIteration, fakecmp.ActionRetrieveAPMConfig, err) - return false - } - return gproto.Equal(initialAPMConfig, retrievedApmConfig) - }, 1*time.Second, 100*time.Millisecond, "APM config was not received by component. expected: %s actual: %s", initialAPMConfig, retrievedApmConfig) + require.NotEqual(t, client.UnitStateFailed, componentState.State, "component failed: %v", componentState.Message) - comp.Component = &proto.Component{ - ApmConfig: modifiedAPMConfig, - } - m.Update(component.Model{Components: []component.Component{comp}}) - err := <-m.errCh - if err != nil { - subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", - healthIteration, err) - return - } - // Set a new APM config to check that we update correctly - case 3: - // In the previous iteration, the (fake) component has received another CheckinExpected - // message to propagate a modified APM configuration. In this iteration we are about to - // retrieve the APM configuration from the same component via the retrieve_apm_config - // action. - assert.Eventuallyf(t, func() bool { - // check the component - res, err := m.PerformAction( - context.Background(), - comp, - comp.Units[0], - fakecmp.ActionRetrieveAPMConfig, - nil) - if err != nil { - subscriptionErrCh <- fmt.Errorf("[case %d]: failed to PerformAction %s: %w", - healthIteration, fakecmp.ActionRetrieveAPMConfig, err) - return false - } + unit, ok := componentState.Units[ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-input"}] + require.True(t, ok, "input unit missing: fake-input") - retrievedApmConfig, err = extractAPMConfigFromActionResult(t, res) - if err != nil { - subscriptionErrCh <- fmt.Errorf("[case %d]: failed to retrieve APM Config from ActionResult %s: %w", - healthIteration, fakecmp.ActionRetrieveAPMConfig, err) - return false - } + if unit.State == client.UnitStateStarting || unit.State == client.UnitStateConfiguring { + // Unit is still starting or reconfiguring, skip to next update + continue STATELOOP + } - return gproto.Equal(modifiedAPMConfig, retrievedApmConfig) - }, 1*time.Second, 100*time.Millisecond, "APM config was not received by component. expected: %s actual: %s", modifiedAPMConfig, retrievedApmConfig) + require.Equal(t, client.UnitStateHealthy, unit.State, "unit isn't healthy: %v", unit.Message) - comp.Component = &proto.Component{ - ApmConfig: nil, - } - m.Update(component.Model{Components: []component.Component{comp}}) - err := <-m.errCh - if err != nil { - subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w", - healthIteration, err) - return - } + t.Logf("Healthy iteration %d starting at %s", testStep, time.Now()) + switch testStep { + case 0: + // Add an APM config to the component config and send an update. + comp.Component = &proto.Component{ + ApmConfig: initialAPMConfig, + } + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh + require.NoError(t, err, "manager Update call must succeed") + + case 1: + // First, check that the APM config set in the previous step is + // visible, if not then we need to wait for a future update + if componentState.Component == nil { + continue STATELOOP + } - case 4: - // In the previous iteration, the (fake) component has received another CheckinExpected - // message to propagate a nil APM configuration. In this iteration we are about to - // retrieve the APM configuration from the same component via the retrieve_apm_config - // action. - assert.Eventuallyf(t, func() bool { - // check the component - res, err := m.PerformAction( - context.Background(), - comp, - comp.Units[0], - fakecmp.ActionRetrieveAPMConfig, - nil) - if err != nil { - subscriptionErrCh <- fmt.Errorf("[case %d]: failed to PerformAction %s: %w", - healthIteration, fakecmp.ActionRetrieveAPMConfig, err) - return false - } + // The APM config has propagated to the component state, now make sure + // it's visible when retrieving via action. + // We use require.Eventually because the new value isn't guaranteed + // to immediately propagate via Action even after it appears in the + // component checkin. - retrievedApmConfig, err = extractAPMConfigFromActionResult(t, res) - if err != nil { - subscriptionErrCh <- fmt.Errorf("[case %d]: failed to retrieve APM Config from ActionResult %s: %w", - healthIteration, fakecmp.ActionRetrieveAPMConfig, err) - return false - } - return retrievedApmConfig == nil - }, 1*time.Second, 100*time.Millisecond, "APM config was not received by component. expected: nil actual: %s", retrievedApmConfig) + require.Eventually(t, + func() bool { + retrievedAPMConfig := fetchAPMConfigWithAction(t, ctx, m, comp) + return gproto.Equal(initialAPMConfig, retrievedAPMConfig) + }, + 3*time.Second, + 50*time.Millisecond, + "Updated APM config should be reported by Actions") - doneCh <- struct{}{} - } + // Config matches, we now try updating to a new APM config + comp.Component = &proto.Component{ + ApmConfig: modifiedAPMConfig, + } + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh + require.NoError(t, err, "manager Update call must succeed") - case client.UnitStateStarting: - // acceptable + case 2: + require.NotNil(t, componentState.Component, "ApmConfig must not be nil") - case client.UnitStateConfiguring: - // set unit back to healthy, so other cases will run. - comp.Units[0].Config = component.MustExpectedConfig(map[string]interface{}{ - "type": "fake", - "state": int(client.UnitStateHealthy), - "message": "Fake Healthy", - }) + require.Eventually(t, + func() bool { + retrievedAPMConfig := fetchAPMConfigWithAction(t, ctx, m, comp) + return gproto.Equal(modifiedAPMConfig, retrievedAPMConfig) + }, + 3*time.Second, + 50*time.Millisecond, + "Updated APM config should be reported by Actions") - m.Update(component.Model{Components: []component.Component{comp}}) - err := <-m.errCh - if err != nil { - t.Logf("error updating component state to health: %v", err) + // Both configs were reported correctly, now clear the APM config + comp.Component = &proto.Component{ + ApmConfig: nil, + } - subscriptionErrCh <- fmt.Errorf("failed to update component: %w", err) - } + m.Update(component.Model{Components: []component.Component{comp}}) + err = <-m.errCh + require.NoError(t, err, "manager Update call must succeed") - default: - // unexpected state that should not have occurred - subscriptionErrCh <- fmt.Errorf("unit reported unexpected state: %v", - unit.State) + case 3: + if componentState.Component != nil && componentState.Component.ApmConfig != nil { + // APM config is still present, wait for next update + continue STATELOOP } + require.Eventually(t, + func() bool { + retrievedAPMConfig := fetchAPMConfigWithAction(t, ctx, m, comp) + return retrievedAPMConfig == nil + }, + 3*time.Second, + 50*time.Millisecond, + "Final APM config should be nil") + + // Success, end the loop + break STATELOOP } + testStep++ } - }() + } - defer drainErrChan(managerErrCh) - defer drainErrChan(subscriptionErrCh) + subCancel() + cancel() - m.Update(component.Model{Components: []component.Component{comp}}) - err = <-m.errCh + err = <-managerErrCh require.NoError(t, err) - - timeout := 30 * time.Second - timeoutTimer := time.NewTimer(timeout) - defer timeoutTimer.Stop() - - // Wait for a success, an error or time out - for { - select { - case <-timeoutTimer.C: - t.Fatalf("timed out after %s", timeout) - case err := <-managerErrCh: - require.NoError(t, err) - case err := <-subscriptionErrCh: - require.NoError(t, err) - case <-doneCh: - subCancel() - cancel() - - err = <-managerErrCh - require.NoError(t, err) - return - } - } } -func extractAPMConfigFromActionResult(t *testing.T, res map[string]interface{}) (*proto.APMConfig, error) { +func fetchAPMConfigWithAction(t *testing.T, ctx context.Context, m *Manager, comp component.Component) *proto.APMConfig { + res, err := m.PerformAction( + context.Background(), + comp, + comp.Units[0], + fakecmp.ActionRetrieveAPMConfig, + nil) + require.NoError(t, err, "failed to retrieve APM config") + apmCfg, ok := res["apm"] - if !ok { - return nil, fmt.Errorf("ActionResult for %s does not contain top level key %s", fakecmp.ActionRetrieveAPMConfig, "apm") - } + require.True(t, ok, "ActionResult must contain top-level 'apm' key") if apmCfg == nil { // the APM config is not set on the component - return nil, nil + return nil } jsonApmConfig, ok := apmCfg.(string) - if !ok { - return nil, fmt.Errorf("ActionResult for %s does not contain a string value: %T", fakecmp.ActionRetrieveAPMConfig, apmCfg) - } + require.True(t, ok, "'apm' key must contain a string") retrievedApmConfig := new(proto.APMConfig) - err := protojson.Unmarshal([]byte(jsonApmConfig), retrievedApmConfig) - if err != nil { - return nil, fmt.Errorf("error unmarshaling apmconfig %s: %w", jsonApmConfig, err) - } - return retrievedApmConfig, nil + err = protojson.Unmarshal([]byte(jsonApmConfig), retrievedApmConfig) + require.NoError(t, err, "'apm' key must contain valid json", jsonApmConfig) + return retrievedApmConfig } func TestManager_FakeInput_Limits(t *testing.T) {