Skip to content

Commit

Permalink
Fix: qs: allow saving pipelines without connected agents (SigNoz#4189)
Browse files Browse the repository at this point in the history
* chore: add test validating pipelines can be saved without connected agents

* chore: allow pipelines to be saved without connected agents
  • Loading branch information
raj-k-singh authored Dec 9, 2023
1 parent 6dd34a7 commit 3a1e8d5
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 75 deletions.
8 changes: 4 additions & 4 deletions pkg/query-service/agentConf/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func (r *Repo) GetConfigHistory(
disabled,
deploy_status,
deploy_result,
last_hash,
last_config
coalesce(last_hash, '') as last_hash,
coalesce(last_config, '{}') as last_config
FROM agent_config_versions AS v
WHERE element_type = $1
ORDER BY created_at desc, version desc
Expand Down Expand Up @@ -89,8 +89,8 @@ func (r *Repo) GetConfigVersion(
disabled,
deploy_status,
deploy_result,
last_hash,
last_config
coalesce(last_hash, '') as last_hash,
coalesce(last_config, '{}') as last_config
FROM agent_config_versions v
WHERE element_type = $1
AND version = $2`, typ, v)
Expand Down
20 changes: 0 additions & 20 deletions pkg/query-service/agentConf/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,21 +172,6 @@ func (m *Manager) ReportConfigDeploymentStatus(
}
}

// Ready indicates if Manager can accept new config update requests
func (mgr *Manager) Ready() bool {
if atomic.LoadUint32(&mgr.lock) != 0 {
return false
}
return opamp.Ready()
}

// Static methods for working with default manager instance in this module.

// Ready indicates if Manager can accept new config update requests
func Ready() bool {
return m.Ready()
}

func GetLatestVersion(
ctx context.Context, elementType ElementTypeDef,
) (*ConfigVersion, *model.ApiError) {
Expand All @@ -210,11 +195,6 @@ func StartNewVersion(
ctx context.Context, userId string, eleType ElementTypeDef, elementIds []string,
) (*ConfigVersion, *model.ApiError) {

if !m.Ready() {
// agent is already being updated, ask caller to wait and re-try after sometime
return nil, model.UnavailableError(fmt.Errorf("agent updater is busy"))
}

// create a new version
cfg := NewConfigversion(eleType)

Expand Down
2 changes: 2 additions & 0 deletions pkg/query-service/agentConf/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func NewConfigversion(typeDef ElementTypeDef) *ConfigVersion {
IsValid: false,
Disabled: false,
DeployStatus: PendingDeploy,
LastHash: "",
LastConf: "{}",
// todo: get user id from context?
// CreatedBy
}
Expand Down
6 changes: 0 additions & 6 deletions pkg/query-service/app/logparsingpipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,6 @@ func (ic *LogParsingPipelineController) ApplyPipelines(

}

if !agentConf.Ready() {
return nil, model.UnavailableError(fmt.Errorf(
"agent updater unavailable at the moment. Please try in sometime",
))
}

// prepare config elements
elements := make([]string, len(pipelines))
for i, p := range pipelines {
Expand Down
145 changes: 100 additions & 45 deletions pkg/query-service/tests/integration/logparsingpipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,17 +367,70 @@ func TestLogPipelinesValidation(t *testing.T) {
}
}

func TestCanSavePipelinesWithoutConnectedAgents(t *testing.T) {
require := require.New(t)
testbed := NewTestbedWithoutOpamp(t)

getPipelinesResp := testbed.GetPipelinesFromQS()
require.Equal(0, len(getPipelinesResp.Pipelines))
require.Equal(0, len(getPipelinesResp.History))

postablePipelines := logparsingpipeline.PostablePipelines{
Pipelines: []logparsingpipeline.PostablePipeline{
{
OrderId: 1,
Name: "pipeline1",
Alias: "pipeline1",
Enabled: true,
Filter: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "method",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
Operator: "=",
Value: "GET",
},
},
},
Config: []logparsingpipeline.PipelineOperator{
{
OrderId: 1,
ID: "add",
Type: "add",
Field: "attributes.test",
Value: "val",
Enabled: true,
Name: "test add",
},
},
},
},
}

testbed.PostPipelinesToQS(postablePipelines)
getPipelinesResp = testbed.GetPipelinesFromQS()
require.Equal(1, len(getPipelinesResp.Pipelines))
require.Equal(1, len(getPipelinesResp.History))

}

// LogPipelinesTestBed coordinates and mocks components involved in
// configuring log pipelines and provides test helpers.
type LogPipelinesTestBed struct {
t *testing.T
testDBFilePath string
testUser *model.User
apiHandler *app.APIHandler
agentConfMgr *agentConf.Manager
opampServer *opamp.Server
opampClientConn *opamp.MockOpAmpConnection
}

func NewLogPipelinesTestBed(t *testing.T) *LogPipelinesTestBed {
func NewTestbedWithoutOpamp(t *testing.T) *LogPipelinesTestBed {
// Create a tmp file based sqlite db for testing.
testDBFile, err := os.CreateTemp("", "test-signoz-db-*")
if err != nil {
Expand Down Expand Up @@ -408,22 +461,61 @@ func NewLogPipelinesTestBed(t *testing.T) *LogPipelinesTestBed {
t.Fatalf("could not create a new ApiHandler: %v", err)
}

opampServer, clientConn := mockOpampAgent(t, testDBFilePath, controller)

user, apiErr := createTestUser()
if apiErr != nil {
t.Fatalf("could not create a test user: %v", apiErr)
}

// Mock an available opamp agent
testDB, err = opampModel.InitDB(testDBFilePath)
require.Nil(t, err, "failed to init opamp model")

agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
DB: testDB,
DBEngine: "sqlite",
AgentFeatures: []agentConf.AgentFeature{
apiHandler.LogsParsingPipelineController,
}})
require.Nil(t, err, "failed to init agentConf")

return &LogPipelinesTestBed{
t: t,
testUser: user,
apiHandler: apiHandler,
opampServer: opampServer,
opampClientConn: clientConn,
t: t,
testDBFilePath: testDBFilePath,
testUser: user,
apiHandler: apiHandler,
agentConfMgr: agentConfMgr,
}
}

func NewLogPipelinesTestBed(t *testing.T) *LogPipelinesTestBed {
testbed := NewTestbedWithoutOpamp(t)

opampServer := opamp.InitializeServer(nil, testbed.agentConfMgr)
err := opampServer.Start(opamp.GetAvailableLocalAddress())
require.Nil(t, err, "failed to start opamp server")

t.Cleanup(func() {
opampServer.Stop()
})

opampClientConnection := &opamp.MockOpAmpConnection{}
opampServer.OnMessage(
opampClientConnection,
&protobufs.AgentToServer{
InstanceUid: "test",
EffectiveConfig: &protobufs.EffectiveConfig{
ConfigMap: newInitialAgentConfigMap(),
},
},
)

testbed.opampServer = opampServer
testbed.opampClientConn = opampClientConnection

return testbed

}

func (tb *LogPipelinesTestBed) PostPipelinesToQSExpectingStatusCode(
postablePipelines logparsingpipeline.PostablePipelines,
expectedStatusCode int,
Expand Down Expand Up @@ -668,43 +760,6 @@ func assertPipelinesResponseMatchesPostedPipelines(
}
}

func mockOpampAgent(
t *testing.T,
testDBFilePath string,
pipelinesController *logparsingpipeline.LogParsingPipelineController,
) (*opamp.Server, *opamp.MockOpAmpConnection) {
// Mock an available opamp agent
testDB, err := opampModel.InitDB(testDBFilePath)
require.Nil(t, err, "failed to init opamp model")

agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
DB: testDB,
DBEngine: "sqlite",
AgentFeatures: []agentConf.AgentFeature{pipelinesController},
})
require.Nil(t, err, "failed to init agentConf")

opampServer := opamp.InitializeServer(nil, agentConfMgr)
err = opampServer.Start(opamp.GetAvailableLocalAddress())
require.Nil(t, err, "failed to start opamp server")

t.Cleanup(func() {
opampServer.Stop()
})

opampClientConnection := &opamp.MockOpAmpConnection{}
opampServer.OnMessage(
opampClientConnection,
&protobufs.AgentToServer{
InstanceUid: "test",
EffectiveConfig: &protobufs.EffectiveConfig{
ConfigMap: newInitialAgentConfigMap(),
},
},
)
return opampServer, opampClientConnection
}

func newInitialAgentConfigMap() *protobufs.AgentConfigMap {
return &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
Expand Down

0 comments on commit 3a1e8d5

Please sign in to comment.