Skip to content

Commit

Permalink
Fix TestManager_FakeInput_APM (elastic#3731)
Browse files Browse the repository at this point in the history
* rewrite TestManager_FakeInput_APM

* retry Actions better

* test rework in progress

* finish reworking fakeinput apm test

---------

Co-authored-by: Pierre HILBERT <[email protected]>
  • Loading branch information
faec and pierrehilbert authored Dec 5, 2023
1 parent 2c04c13 commit dabea2e
Showing 1 changed file with 117 additions and 211 deletions.
328 changes: 117 additions & 211 deletions pkg/component/runtime/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,8 @@ func TestManager_FakeInput_Features(t *testing.T) {
func TestManager_FakeInput_APM(t *testing.T) {
testPaths(t)

ctx, cancel := context.WithCancel(context.Background())
timeout := 30 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

agentInfo, _ := info.NewAgentInfo(ctx, true)
Expand Down Expand Up @@ -550,8 +551,6 @@ func TestManager_FakeInput_APM(t *testing.T) {

subscriptionCtx, subCancel := context.WithCancel(context.Background())
defer subCancel()
subscriptionErrCh := make(chan error)
doneCh := make(chan struct{})

initialAPMConfig := &proto.APMConfig{
Elastic: &proto.ElasticAPM{
Expand Down Expand Up @@ -581,245 +580,152 @@ func TestManager_FakeInput_APM(t *testing.T) {
},
}

go func() {
sub := m.Subscribe(subscriptionCtx, compID)
var healthIteration int
var retrievedApmConfig *proto.APMConfig
for {
select {
case <-subscriptionCtx.Done():
return
case componentState := <-sub.Ch():
t.Logf("component state changed: %+v", componentState)
sub := m.Subscribe(subscriptionCtx, compID)

if componentState.State == client.UnitStateFailed {
subscriptionErrCh <- fmt.Errorf("component failed: %s", componentState.Message)
return
}

unit, ok := componentState.Units[ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-input"}]
if !ok {
subscriptionErrCh <- errors.New("unit missing: fake-input")
return
}

switch unit.State {
case client.UnitStateFailed:
subscriptionErrCh <- fmt.Errorf("unit failed: %s", unit.Message)

case client.UnitStateHealthy:
healthIteration++
t.Logf("Healthy iteration %d starting at %s", healthIteration, time.Now())
switch healthIteration {
case 1: // yes, it's starting on 1
comp.Component = &proto.Component{
ApmConfig: initialAPMConfig,
}
m.Update(component.Model{Components: []component.Component{comp}})
err := <-m.errCh
if err != nil {
subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w",
healthIteration, err)
return
}
m.Update(component.Model{Components: []component.Component{comp}})
err = <-m.errCh
require.NoError(t, err, "manager Update call must succeed")

// testStep tracks how far into the test sequence we've progressed.
// 0: When unit is healthy, set initialAPMConfig
// 1: When initialAPMConfig is active, set modifiedAPMConfig
// 2: When modifiedAPMConfig is active, clear all APMConfig
// 3: When APM config is empty again, succeed
var testStep int
STATELOOP:
for {
select {
case <-ctx.Done():
require.Fail(t, "timed out waiting for state update")
case componentState := <-sub.Ch():
t.Logf("component state changed: %+v", componentState)

// check if config sent on iteration 1 was set
case 2:
// In the previous iteration, the (fake) component has received a CheckinExpected
// message to propagate the APM configuration. In this iteration we are about to
// retrieve the APM configuration from the same component via the retrieve_apm_config
// action. Within the component, which is running as a separate process, actions
// and CheckinExpected messages are processed concurrently. We need some way to wait
// a reasonably short amount of time for the CheckinExpected message to be applied by the
// component (thus setting the APM config) before we query the same component
// for apm config information. We accomplish this via assert.Eventually.
// We also send a modified APM config to see that the component updates correctly and
// reports the new config in the next iteration.
assert.Eventuallyf(t, func() bool {
// check the component
res, err := m.PerformAction(
context.Background(),
comp,
comp.Units[0],
fakecmp.ActionRetrieveAPMConfig,
nil)
if err != nil {
subscriptionErrCh <- fmt.Errorf("[case %d]: failed to PerformAction %s: %w",
healthIteration, fakecmp.ActionRetrieveAPMConfig, err)
return false
}
retrievedApmConfig, err = extractAPMConfigFromActionResult(t, res)
if err != nil {
subscriptionErrCh <- fmt.Errorf("[case %d]: failed to retrieve APM Config from ActionResult %s: %w",
healthIteration, fakecmp.ActionRetrieveAPMConfig, err)
return false
}
return gproto.Equal(initialAPMConfig, retrievedApmConfig)
}, 1*time.Second, 100*time.Millisecond, "APM config was not received by component. expected: %s actual: %s", initialAPMConfig, retrievedApmConfig)
require.NotEqual(t, client.UnitStateFailed, componentState.State, "component failed: %v", componentState.Message)

comp.Component = &proto.Component{
ApmConfig: modifiedAPMConfig,
}
m.Update(component.Model{Components: []component.Component{comp}})
err := <-m.errCh
if err != nil {
subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w",
healthIteration, err)
return
}
// Set a new APM config to check that we update correctly
case 3:
// In the previous iteration, the (fake) component has received another CheckinExpected
// message to propagate a modified APM configuration. In this iteration we are about to
// retrieve the APM configuration from the same component via the retrieve_apm_config
// action.
assert.Eventuallyf(t, func() bool {
// check the component
res, err := m.PerformAction(
context.Background(),
comp,
comp.Units[0],
fakecmp.ActionRetrieveAPMConfig,
nil)
if err != nil {
subscriptionErrCh <- fmt.Errorf("[case %d]: failed to PerformAction %s: %w",
healthIteration, fakecmp.ActionRetrieveAPMConfig, err)
return false
}
unit, ok := componentState.Units[ComponentUnitKey{UnitType: client.UnitTypeInput, UnitID: "fake-input"}]
require.True(t, ok, "input unit missing: fake-input")

retrievedApmConfig, err = extractAPMConfigFromActionResult(t, res)
if err != nil {
subscriptionErrCh <- fmt.Errorf("[case %d]: failed to retrieve APM Config from ActionResult %s: %w",
healthIteration, fakecmp.ActionRetrieveAPMConfig, err)
return false
}
if unit.State == client.UnitStateStarting || unit.State == client.UnitStateConfiguring {
// Unit is still starting or reconfiguring, skip to next update
continue STATELOOP
}

return gproto.Equal(modifiedAPMConfig, retrievedApmConfig)
}, 1*time.Second, 100*time.Millisecond, "APM config was not received by component. expected: %s actual: %s", modifiedAPMConfig, retrievedApmConfig)
require.Equal(t, client.UnitStateHealthy, unit.State, "unit isn't healthy: %v", unit.Message)

comp.Component = &proto.Component{
ApmConfig: nil,
}
m.Update(component.Model{Components: []component.Component{comp}})
err := <-m.errCh
if err != nil {
subscriptionErrCh <- fmt.Errorf("[case %d]: failed to update component: %w",
healthIteration, err)
return
}
t.Logf("Healthy iteration %d starting at %s", testStep, time.Now())
switch testStep {
case 0:
// Add an APM config to the component config and send an update.
comp.Component = &proto.Component{
ApmConfig: initialAPMConfig,
}
m.Update(component.Model{Components: []component.Component{comp}})
err = <-m.errCh
require.NoError(t, err, "manager Update call must succeed")

case 1:
// First, check that the APM config set in the previous step is
// visible, if not then we need to wait for a future update
if componentState.Component == nil {
continue STATELOOP
}

case 4:
// In the previous iteration, the (fake) component has received another CheckinExpected
// message to propagate a nil APM configuration. In this iteration we are about to
// retrieve the APM configuration from the same component via the retrieve_apm_config
// action.
assert.Eventuallyf(t, func() bool {
// check the component
res, err := m.PerformAction(
context.Background(),
comp,
comp.Units[0],
fakecmp.ActionRetrieveAPMConfig,
nil)
if err != nil {
subscriptionErrCh <- fmt.Errorf("[case %d]: failed to PerformAction %s: %w",
healthIteration, fakecmp.ActionRetrieveAPMConfig, err)
return false
}
// The APM config has propagated to the component state, now make sure
// it's visible when retrieving via action.
// We use require.Eventually because the new value isn't guaranteed
// to immediately propagate via Action even after it appears in the
// component checkin.

retrievedApmConfig, err = extractAPMConfigFromActionResult(t, res)
if err != nil {
subscriptionErrCh <- fmt.Errorf("[case %d]: failed to retrieve APM Config from ActionResult %s: %w",
healthIteration, fakecmp.ActionRetrieveAPMConfig, err)
return false
}
return retrievedApmConfig == nil
}, 1*time.Second, 100*time.Millisecond, "APM config was not received by component. expected: nil actual: %s", retrievedApmConfig)
require.Eventually(t,
func() bool {
retrievedAPMConfig := fetchAPMConfigWithAction(t, ctx, m, comp)
return gproto.Equal(initialAPMConfig, retrievedAPMConfig)
},
3*time.Second,
50*time.Millisecond,
"Updated APM config should be reported by Actions")

doneCh <- struct{}{}
}
// Config matches, we now try updating to a new APM config
comp.Component = &proto.Component{
ApmConfig: modifiedAPMConfig,
}
m.Update(component.Model{Components: []component.Component{comp}})
err = <-m.errCh
require.NoError(t, err, "manager Update call must succeed")

case client.UnitStateStarting:
// acceptable
case 2:
require.NotNil(t, componentState.Component, "ApmConfig must not be nil")

case client.UnitStateConfiguring:
// set unit back to healthy, so other cases will run.
comp.Units[0].Config = component.MustExpectedConfig(map[string]interface{}{
"type": "fake",
"state": int(client.UnitStateHealthy),
"message": "Fake Healthy",
})
require.Eventually(t,
func() bool {
retrievedAPMConfig := fetchAPMConfigWithAction(t, ctx, m, comp)
return gproto.Equal(modifiedAPMConfig, retrievedAPMConfig)
},
3*time.Second,
50*time.Millisecond,
"Updated APM config should be reported by Actions")

m.Update(component.Model{Components: []component.Component{comp}})
err := <-m.errCh
if err != nil {
t.Logf("error updating component state to health: %v", err)
// Both configs were reported correctly, now clear the APM config
comp.Component = &proto.Component{
ApmConfig: nil,
}

subscriptionErrCh <- fmt.Errorf("failed to update component: %w", err)
}
m.Update(component.Model{Components: []component.Component{comp}})
err = <-m.errCh
require.NoError(t, err, "manager Update call must succeed")

default:
// unexpected state that should not have occurred
subscriptionErrCh <- fmt.Errorf("unit reported unexpected state: %v",
unit.State)
case 3:
if componentState.Component != nil && componentState.Component.ApmConfig != nil {
// APM config is still present, wait for next update
continue STATELOOP
}

require.Eventually(t,
func() bool {
retrievedAPMConfig := fetchAPMConfigWithAction(t, ctx, m, comp)
return retrievedAPMConfig == nil
},
3*time.Second,
50*time.Millisecond,
"Final APM config should be nil")

// Success, end the loop
break STATELOOP
}
testStep++
}
}()
}

defer drainErrChan(managerErrCh)
defer drainErrChan(subscriptionErrCh)
subCancel()
cancel()

m.Update(component.Model{Components: []component.Component{comp}})
err = <-m.errCh
err = <-managerErrCh
require.NoError(t, err)

timeout := 30 * time.Second
timeoutTimer := time.NewTimer(timeout)
defer timeoutTimer.Stop()

// Wait for a success, an error or time out
for {
select {
case <-timeoutTimer.C:
t.Fatalf("timed out after %s", timeout)
case err := <-managerErrCh:
require.NoError(t, err)
case err := <-subscriptionErrCh:
require.NoError(t, err)
case <-doneCh:
subCancel()
cancel()

err = <-managerErrCh
require.NoError(t, err)
return
}
}
}

func extractAPMConfigFromActionResult(t *testing.T, res map[string]interface{}) (*proto.APMConfig, error) {
func fetchAPMConfigWithAction(t *testing.T, ctx context.Context, m *Manager, comp component.Component) *proto.APMConfig {
res, err := m.PerformAction(
context.Background(),
comp,
comp.Units[0],
fakecmp.ActionRetrieveAPMConfig,
nil)
require.NoError(t, err, "failed to retrieve APM config")

apmCfg, ok := res["apm"]
if !ok {
return nil, fmt.Errorf("ActionResult for %s does not contain top level key %s", fakecmp.ActionRetrieveAPMConfig, "apm")
}
require.True(t, ok, "ActionResult must contain top-level 'apm' key")
if apmCfg == nil {
// the APM config is not set on the component
return nil, nil
return nil
}

jsonApmConfig, ok := apmCfg.(string)
if !ok {
return nil, fmt.Errorf("ActionResult for %s does not contain a string value: %T", fakecmp.ActionRetrieveAPMConfig, apmCfg)
}
require.True(t, ok, "'apm' key must contain a string")

retrievedApmConfig := new(proto.APMConfig)
err := protojson.Unmarshal([]byte(jsonApmConfig), retrievedApmConfig)
if err != nil {
return nil, fmt.Errorf("error unmarshaling apmconfig %s: %w", jsonApmConfig, err)
}
return retrievedApmConfig, nil
err = protojson.Unmarshal([]byte(jsonApmConfig), retrievedApmConfig)
require.NoError(t, err, "'apm' key must contain valid json", jsonApmConfig)
return retrievedApmConfig
}

func TestManager_FakeInput_Limits(t *testing.T) {
Expand Down

0 comments on commit dabea2e

Please sign in to comment.