From 3a1e8d523a09de9eef007acbfe0b9a712a115a50 Mon Sep 17 00:00:00 2001 From: Raj Kamal Singh <1133322+raj-k-singh@users.noreply.github.com> Date: Sat, 9 Dec 2023 10:17:06 +0530 Subject: [PATCH] Fix: qs: allow saving pipelines without connected agents (#4189) * chore: add test validating pipelines can be saved without connected agents * chore: allow pipelines to be saved without connected agents --- pkg/query-service/agentConf/db.go | 8 +- pkg/query-service/agentConf/manager.go | 20 --- pkg/query-service/agentConf/version.go | 2 + .../app/logparsingpipeline/controller.go | 6 - .../integration/logparsingpipeline_test.go | 145 ++++++++++++------ 5 files changed, 106 insertions(+), 75 deletions(-) diff --git a/pkg/query-service/agentConf/db.go b/pkg/query-service/agentConf/db.go index 3369dbe23f..ffbc2f53a8 100644 --- a/pkg/query-service/agentConf/db.go +++ b/pkg/query-service/agentConf/db.go @@ -50,8 +50,8 @@ func (r *Repo) GetConfigHistory( disabled, deploy_status, deploy_result, - last_hash, - last_config + coalesce(last_hash, '') as last_hash, + coalesce(last_config, '{}') as last_config FROM agent_config_versions AS v WHERE element_type = $1 ORDER BY created_at desc, version desc @@ -89,8 +89,8 @@ func (r *Repo) GetConfigVersion( disabled, deploy_status, deploy_result, - last_hash, - last_config + coalesce(last_hash, '') as last_hash, + coalesce(last_config, '{}') as last_config FROM agent_config_versions v WHERE element_type = $1 AND version = $2`, typ, v) diff --git a/pkg/query-service/agentConf/manager.go b/pkg/query-service/agentConf/manager.go index a919185d0d..0e77383f7e 100644 --- a/pkg/query-service/agentConf/manager.go +++ b/pkg/query-service/agentConf/manager.go @@ -172,21 +172,6 @@ func (m *Manager) ReportConfigDeploymentStatus( } } -// Ready indicates if Manager can accept new config update requests -func (mgr *Manager) Ready() bool { - if atomic.LoadUint32(&mgr.lock) != 0 { - return false - } - return opamp.Ready() -} - -// Static methods for working with default manager instance in this module. - -// Ready indicates if Manager can accept new config update requests -func Ready() bool { - return m.Ready() -} - func GetLatestVersion( ctx context.Context, elementType ElementTypeDef, ) (*ConfigVersion, *model.ApiError) { @@ -210,11 +195,6 @@ func StartNewVersion( ctx context.Context, userId string, eleType ElementTypeDef, elementIds []string, ) (*ConfigVersion, *model.ApiError) { - if !m.Ready() { - // agent is already being updated, ask caller to wait and re-try after sometime - return nil, model.UnavailableError(fmt.Errorf("agent updater is busy")) - } - // create a new version cfg := NewConfigversion(eleType) diff --git a/pkg/query-service/agentConf/version.go b/pkg/query-service/agentConf/version.go index 13be5f7cd2..d2bc4547b3 100644 --- a/pkg/query-service/agentConf/version.go +++ b/pkg/query-service/agentConf/version.go @@ -53,6 +53,8 @@ func NewConfigversion(typeDef ElementTypeDef) *ConfigVersion { IsValid: false, Disabled: false, DeployStatus: PendingDeploy, + LastHash: "", + LastConf: "{}", // todo: get user id from context? // CreatedBy } diff --git a/pkg/query-service/app/logparsingpipeline/controller.go b/pkg/query-service/app/logparsingpipeline/controller.go index 7880ac27b7..066123c416 100644 --- a/pkg/query-service/app/logparsingpipeline/controller.go +++ b/pkg/query-service/app/logparsingpipeline/controller.go @@ -73,12 +73,6 @@ func (ic *LogParsingPipelineController) ApplyPipelines( } - if !agentConf.Ready() { - return nil, model.UnavailableError(fmt.Errorf( - "agent updater unavailable at the moment. Please try in sometime", - )) - } - // prepare config elements elements := make([]string, len(pipelines)) for i, p := range pipelines { diff --git a/pkg/query-service/tests/integration/logparsingpipeline_test.go b/pkg/query-service/tests/integration/logparsingpipeline_test.go index 0b4d22973c..4c260596e5 100644 --- a/pkg/query-service/tests/integration/logparsingpipeline_test.go +++ b/pkg/query-service/tests/integration/logparsingpipeline_test.go @@ -367,17 +367,70 @@ func TestLogPipelinesValidation(t *testing.T) { } } +func TestCanSavePipelinesWithoutConnectedAgents(t *testing.T) { + require := require.New(t) + testbed := NewTestbedWithoutOpamp(t) + + getPipelinesResp := testbed.GetPipelinesFromQS() + require.Equal(0, len(getPipelinesResp.Pipelines)) + require.Equal(0, len(getPipelinesResp.History)) + + postablePipelines := logparsingpipeline.PostablePipelines{ + Pipelines: []logparsingpipeline.PostablePipeline{ + { + OrderId: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + Filter: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "method", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: "=", + Value: "GET", + }, + }, + }, + Config: []logparsingpipeline.PipelineOperator{ + { + OrderId: 1, + ID: "add", + Type: "add", + Field: "attributes.test", + Value: "val", + Enabled: true, + Name: "test add", + }, + }, + }, + }, + } + + testbed.PostPipelinesToQS(postablePipelines) + getPipelinesResp = testbed.GetPipelinesFromQS() + require.Equal(1, len(getPipelinesResp.Pipelines)) + require.Equal(1, len(getPipelinesResp.History)) + +} + // LogPipelinesTestBed coordinates and mocks components involved in // configuring log pipelines and provides test helpers. type LogPipelinesTestBed struct { t *testing.T + testDBFilePath string testUser *model.User apiHandler *app.APIHandler + agentConfMgr *agentConf.Manager opampServer *opamp.Server opampClientConn *opamp.MockOpAmpConnection } -func NewLogPipelinesTestBed(t *testing.T) *LogPipelinesTestBed { +func NewTestbedWithoutOpamp(t *testing.T) *LogPipelinesTestBed { // Create a tmp file based sqlite db for testing. testDBFile, err := os.CreateTemp("", "test-signoz-db-*") if err != nil { @@ -408,22 +461,61 @@ func NewLogPipelinesTestBed(t *testing.T) *LogPipelinesTestBed { t.Fatalf("could not create a new ApiHandler: %v", err) } - opampServer, clientConn := mockOpampAgent(t, testDBFilePath, controller) - user, apiErr := createTestUser() if apiErr != nil { t.Fatalf("could not create a test user: %v", apiErr) } + // Mock an available opamp agent + testDB, err = opampModel.InitDB(testDBFilePath) + require.Nil(t, err, "failed to init opamp model") + + agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{ + DB: testDB, + DBEngine: "sqlite", + AgentFeatures: []agentConf.AgentFeature{ + apiHandler.LogsParsingPipelineController, + }}) + require.Nil(t, err, "failed to init agentConf") + return &LogPipelinesTestBed{ - t: t, - testUser: user, - apiHandler: apiHandler, - opampServer: opampServer, - opampClientConn: clientConn, + t: t, + testDBFilePath: testDBFilePath, + testUser: user, + apiHandler: apiHandler, + agentConfMgr: agentConfMgr, } } +func NewLogPipelinesTestBed(t *testing.T) *LogPipelinesTestBed { + testbed := NewTestbedWithoutOpamp(t) + + opampServer := opamp.InitializeServer(nil, testbed.agentConfMgr) + err := opampServer.Start(opamp.GetAvailableLocalAddress()) + require.Nil(t, err, "failed to start opamp server") + + t.Cleanup(func() { + opampServer.Stop() + }) + + opampClientConnection := &opamp.MockOpAmpConnection{} + opampServer.OnMessage( + opampClientConnection, + &protobufs.AgentToServer{ + InstanceUid: "test", + EffectiveConfig: &protobufs.EffectiveConfig{ + ConfigMap: newInitialAgentConfigMap(), + }, + }, + ) + + testbed.opampServer = opampServer + testbed.opampClientConn = opampClientConnection + + return testbed + +} + func (tb *LogPipelinesTestBed) PostPipelinesToQSExpectingStatusCode( postablePipelines logparsingpipeline.PostablePipelines, expectedStatusCode int, @@ -668,43 +760,6 @@ func assertPipelinesResponseMatchesPostedPipelines( } } -func mockOpampAgent( - t *testing.T, - testDBFilePath string, - pipelinesController *logparsingpipeline.LogParsingPipelineController, -) (*opamp.Server, *opamp.MockOpAmpConnection) { - // Mock an available opamp agent - testDB, err := opampModel.InitDB(testDBFilePath) - require.Nil(t, err, "failed to init opamp model") - - agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{ - DB: testDB, - DBEngine: "sqlite", - AgentFeatures: []agentConf.AgentFeature{pipelinesController}, - }) - require.Nil(t, err, "failed to init agentConf") - - opampServer := opamp.InitializeServer(nil, agentConfMgr) - err = opampServer.Start(opamp.GetAvailableLocalAddress()) - require.Nil(t, err, "failed to start opamp server") - - t.Cleanup(func() { - opampServer.Stop() - }) - - opampClientConnection := &opamp.MockOpAmpConnection{} - opampServer.OnMessage( - opampClientConnection, - &protobufs.AgentToServer{ - InstanceUid: "test", - EffectiveConfig: &protobufs.EffectiveConfig{ - ConfigMap: newInitialAgentConfigMap(), - }, - }, - ) - return opampServer, opampClientConnection -} - func newInitialAgentConfigMap() *protobufs.AgentConfigMap { return &protobufs.AgentConfigMap{ ConfigMap: map[string]*protobufs.AgentConfigFile{