Skip to content

Commit

Permalink
refactor tests to reuse stack and Elastic-Agent
Browse files Browse the repository at this point in the history
  • Loading branch information
belimawr committed Oct 13, 2023
1 parent 92b25f5 commit 04579e4
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 76 deletions.
91 changes: 35 additions & 56 deletions testing/integration/datastreams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"net/http/httputil"
"os"
"path/filepath"
"regexp"
"strings"
"testing"
"text/template"
Expand All @@ -27,24 +28,21 @@ import (
"github.com/elastic/elastic-agent/pkg/control/v2/client"
atesting "github.com/elastic/elastic-agent/pkg/testing"
"github.com/elastic/elastic-agent/pkg/testing/define"
"github.com/elastic/elastic-agent/pkg/testing/tools"
"github.com/elastic/elastic-agent/pkg/testing/tools/estools"
"github.com/elastic/elastic-agent/version"
"github.com/elastic/elastic-transport-go/v8/elastictransport"
)

func TestFlattenedDatastreamFleetPolicy(t *testing.T) {
info := define.Require(t, define.Requirements{
Local: false,
Stack: &define.Stack{
Version: version.Agent + "-SNAPSHOT",
},
Sudo: true,
})

func testFlattenedDatastreamFleetPolicy(
t *testing.T,
ctx context.Context,
info *define.Info,
agentFixture *atesting.Fixture,
policy kibana.PolicyResponse,
) {
dsType := "logs"
dsNamespace := strings.ToLower(fmt.Sprintf("%snamespace%d", t.Name(), rand.Uint64()))
dsDataset := strings.ToLower(fmt.Sprintf("%s-dataset", t.Name()))
dsNamespace := cleanString(fmt.Sprintf("%snamespace%d", t.Name(), rand.Uint64()))
dsDataset := cleanString(fmt.Sprintf("%s-dataset", t.Name()))
numEvents := 60

tempDir := t.TempDir()
Expand All @@ -56,50 +54,16 @@ func TestFlattenedDatastreamFleetPolicy(t *testing.T) {
t.Fatalf("could not create new fixture: %s", err)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// 1. Create a policy in Fleet with monitoring enabled.
// To ensure there are no conflicts with previous test runs against
// the same ESS stack, we add the current time at the end of the policy
// name. This policy does not contain any integration.
createPolicyReq := kibana.AgentPolicy{
Name: t.Name() + "--" + time.Now().Format(time.RFC3339Nano),
Namespace: info.Namespace,
Description: "Test policy for " + t.Name(),
MonitoringEnabled: []kibana.MonitoringEnabledOption{
kibana.MonitoringEnabledLogs,
kibana.MonitoringEnabledMetrics,
},
IsProtected: false,
}
installOpts := atesting.InstallOpts{
NonInteractive: true,
Force: true,
}

// 2. Install the Elastic-Agent with the policy that
// was just created.
policy, err := tools.InstallAgentWithPolicy(ctx,
t,
installOpts,
agentFixture,
info.KibanaClient,
createPolicyReq)
if err != nil {
t.Fatalf("could not install Elastic-AGent with Policy: %s", err)
}

// 3. Prepare a request to add an integration to the policy
// 1. Prepare a request to add an integration to the policy
tmpl, err := template.New(t.Name() + "custom-log-policy").Parse(policyJSON)
if err != nil {
t.Fatalf("cannot parse template: %s", err)
}

// The time here ensures there are no conflicts with the integration name
// in Fleet.
agentPolicyBuffer := bytes.Buffer{}
err = tmpl.Execute(&agentPolicyBuffer, plolicyVars{
agentPolicyBuilder := strings.Builder{}
err = tmpl.Execute(&agentPolicyBuilder, plolicyVars{
Name: "Log-Input-" + t.Name() + "-" + time.Now().Format(time.RFC3339),
PolicyID: policy.ID,
LogFilePath: logFilePath,
Expand All @@ -109,15 +73,17 @@ func TestFlattenedDatastreamFleetPolicy(t *testing.T) {
if err != nil {
t.Fatalf("could not render template: %s", err)
}
// We keep a copy of the policy for debugging prurposes
agentPolicy := agentPolicyBuilder.String()

// 4. Call Kibana to create the policy.
// 2. Call Kibana to create the policy.
// Docs: https://www.elastic.co/guide/en/fleet/current/fleet-api-docs.html#create-integration-policy-api
resp, err := info.KibanaClient.Connection.Send(
http.MethodPost,
"/api/fleet/package_policies",
nil,
nil,
&agentPolicyBuffer)
bytes.NewBufferString(agentPolicy))
if err != nil {
t.Fatalf("could not execute request to Kibana/Fleet: %s", err)
}
Expand All @@ -130,14 +96,20 @@ func TestFlattenedDatastreamFleetPolicy(t *testing.T) {
if err != nil {
t.Fatalf("could not dump error response from Kibana: %s", err)
}
t.Log("Kibana error response")
// Make debugging as easy as possible
t.Log("================================================================================")
t.Log("Kibana error response:")
t.Log(string(respDump))
t.Log("================================================================================")
t.Log("Rendered policy:")
t.Log(agentPolicy)
t.Log("================================================================================")
t.FailNow()
}

require.Eventually(
t,
ensureDocumentsInES(t, context.TODO(), info.ESClient, dsType, dsDataset, dsNamespace, int(numEvents)),
ensureDocumentsInES(t, ctx, info.ESClient, dsType, dsDataset, dsNamespace, numEvents),
120*time.Second,
time.Second,
"could not get all expected documents form ES")
Expand All @@ -152,6 +124,7 @@ func TestFlattenedDatastreamStandalone(t *testing.T) {
Sudo: true,
})

ctx := context.Background()
dsType := "logs"
dsNamespace := fmt.Sprintf("%s-namespace-%d", t.Name(), rand.Uint64())
dsDataset := fmt.Sprintf("%s-dataset", t.Name())
Expand Down Expand Up @@ -188,12 +161,12 @@ func TestFlattenedDatastreamStandalone(t *testing.T) {
})

// 1. The first thing to do is to prepare the fixture.
if err := agentFixture.Prepare(context.Background()); err != nil {
if err := agentFixture.Prepare(ctx); err != nil {
t.Fatalf("cannot prepare Elastic-Agent: %s", err)
}

// 2. Create a context with cancel to easily stop the Elastic-Agent
runCtx, cancelAgentRunCtx := context.WithCancel(context.Background())
runCtx, cancelAgentRunCtx := context.WithCancel(ctx)
go func() {
// make sure the test does not hang forever
time.Sleep(90 * time.Second)
Expand Down Expand Up @@ -272,7 +245,7 @@ func ensureDocumentsInES(
t.Logf("error quering ES, will retry later: %s", err)
}

if docs.Hits.Total.Value == int(numEvents) {
if docs.Hits.Total.Value == numEvents {
return true
}

Expand Down Expand Up @@ -333,6 +306,12 @@ func generateLogFile(t *testing.T, fullPath string, tick time.Duration, events i
}()
}

var nonAlphanumericRegex = regexp.MustCompile(`[^a-zA-Z0-9 ]+`)

func cleanString(s string) string {
return nonAlphanumericRegex.ReplaceAllString(strings.ToLower(s), "")
}

type plolicyVars struct {
Name string
PolicyID string
Expand Down
54 changes: 34 additions & 20 deletions testing/integration/monitoring_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/elastic/elastic-agent/pkg/testing/tools/fleettools"
)

func TestMonitoringLogsShipped(t *testing.T) {
func TestLogIngestionFleetManaged(t *testing.T) {
info := define.Require(t, define.Requirements{
OS: []define.OS{{Type: define.Linux}},
Stack: &define.Stack{},
Expand All @@ -37,12 +37,13 @@ func TestMonitoringLogsShipped(t *testing.T) {
})
ctx := context.Background()

t.Logf("got namespace: %s", info.Namespace)
t.Skip("Test is flaky; see https://github.com/elastic/elastic-agent/issues/3081")

agentFixture, err := define.NewFixture(t, define.Version())
require.NoError(t, err)

// 1. Create a policy in Fleet with monitoring enabled.
// To ensure there are no conflicts with previous test runs against
// the same ESS stack, we add the current time at the end of the policy
// name. This policy does not contain any integration.
t.Log("Enrolling agent in Fleet with a test policy")
createPolicyReq := kibana.AgentPolicy{
Name: fmt.Sprintf("test-policy-enroll-%d", time.Now().Unix()),
Expand All @@ -60,29 +61,41 @@ func TestMonitoringLogsShipped(t *testing.T) {
},
}

// Stage 1: Install
// As part of the cleanup process, we'll uninstall the agent
installOpts := atesting.InstallOpts{
NonInteractive: true,
Force: true,
}
policy, err := tools.InstallAgentWithPolicy(ctx, t,
installOpts, agentFixture, info.KibanaClient, createPolicyReq)

// 2. Install the Elastic-Agent with the policy that
// was just created.
policy, err := tools.InstallAgentWithPolicy(
ctx,
t,
installOpts,
agentFixture,
info.KibanaClient,
createPolicyReq)
require.NoError(t, err)
t.Logf("created policy: %s", policy.ID)

check.ConnectedToFleet(t, agentFixture, 5*time.Minute)

// Stage 2: check indices
// This is mostly for debugging
resp, err := estools.GetAllindicies(info.ESClient)
require.NoError(t, err)
for _, run := range resp {
t.Logf("%s: %d/%d deleted: %d\n",
run.Index, run.DocsCount, run.StoreSizeBytes, run.DocsDeleted)
}
t.Run("Monitoring logs are shipped", func(t *testing.T) {
testMonitoringLogsAreShipped(t, ctx, info, agentFixture, policy)
})

t.Run("Normal logs with flattened data_stream are shipped", func(t *testing.T) {
testFlattenedDatastreamFleetPolicy(t, ctx, info, agentFixture, policy)
})
}

// Stage 3: Make sure metricbeat logs are populated
func testMonitoringLogsAreShipped(
t *testing.T,
ctx context.Context,
info *define.Info,
agentFixture *atesting.Fixture,
policy kibana.PolicyResponse,
) {
// Stage 1: Make sure metricbeat logs are populated
t.Log("Making sure metricbeat logs are populated")
docs := findESDocs(t, func() (estools.Documents, error) {
return estools.GetLogsForDataset(info.ESClient, "elastic_agent.metricbeat")
Expand All @@ -101,15 +114,15 @@ func TestMonitoringLogsShipped(t *testing.T) {
c.Name, client.Healthy, client.State(c.State))
}

// Stage 5: Make sure we have message confirming central management is running
// Stage 2: Make sure we have message confirming central management is running
t.Log("Making sure we have message confirming central management is running")
docs = findESDocs(t, func() (estools.Documents, error) {
return estools.FindMatchingLogLines(info.ESClient, info.Namespace,
"Parsed configuration and determined agent is managed by Fleet")
})
require.NotZero(t, len(docs.Hits.Hits))

// Stage 6: verify logs from the monitoring components are not sent to the output
// Stage 3: verify logs from the monitoring components are not sent to the output
t.Log("Check monitoring logs")
hostname, err := os.Hostname()
if err != nil {
Expand All @@ -123,6 +136,7 @@ func TestMonitoringLogsShipped(t *testing.T) {
// We cannot search for `component.id` because at the moment of writing
// this field is not mapped. There is an issue for that:
// https://github.com/elastic/integrations/issues/6545
// TODO: use runtime fields while the above issue is not resolved.

docs = findESDocs(t, func() (estools.Documents, error) {
return estools.GetLogsForAgentID(info.ESClient, agentID)
Expand Down

0 comments on commit 04579e4

Please sign in to comment.