diff --git a/pluginmanager/logstore_config.go b/pluginmanager/logstore_config.go index 4403fc3e24..e04c7af46e 100644 --- a/pluginmanager/logstore_config.go +++ b/pluginmanager/logstore_config.go @@ -108,6 +108,7 @@ type LogstoreConfig struct { EnvSet map[string]struct{} CollectingContainersMeta bool pluginID int32 + extensionIndex int32 } func (p *LogstoreStatistics) Init(context pipeline.Context) { @@ -503,6 +504,8 @@ func createLogstoreConfig(project string, logstore string, configName string, lo } else if _, isServiceInput := pipeline.ServiceInputs[pluginType]; isServiceInput { // Load ServiceInput plugin defined in pipeline.ServiceInputs err = loadService(logstoreC.genPluginMeta(pluginTypeWithIDStr), logstoreC, input["detail"]) + } else { + err = fmt.Errorf("invalid input type: %s", pluginTypeWithIDStr) } if err != nil { return nil, err @@ -799,6 +802,7 @@ func (lc *LogstoreConfig) genPluginMeta(pluginTypeWithID string) *pipeline.Plugi } if ids := strings.IndexByte(pluginTypeWithID, '/'); ids != -1 { if pluginID, err := strconv.ParseInt(pluginTypeWithID[ids+1:], 10, 32); err == nil { + atomic.StoreInt32(&lc.extensionIndex, 0) atomic.StoreInt32(&lc.pluginID, int32(pluginID)) } return &pipeline.PluginMeta{ @@ -808,6 +812,8 @@ func (lc *LogstoreConfig) genPluginMeta(pluginTypeWithID string) *pipeline.Plugi } } } + + // only for anonymous extensions. pluginType := pluginTypeWithID pluginID := lc.genPluginID() pluginTypeWithID = fmt.Sprintf("%s/%s", pluginType, pluginID) @@ -860,7 +866,7 @@ func GetPluginPriority(pluginTypeWithID string) int { } func (lc *LogstoreConfig) genPluginID() string { - return fmt.Sprintf("%v", atomic.AddInt32(&lc.pluginID, 1)) + return fmt.Sprintf("%v_%v", atomic.LoadInt32(&lc.pluginID), atomic.AddInt32(&lc.extensionIndex, 1)) } func init() { diff --git a/pluginmanager/logstore_config_test.go b/pluginmanager/logstore_config_test.go index 7674af3bdc..65e3bd7dee 100644 --- a/pluginmanager/logstore_config_test.go +++ b/pluginmanager/logstore_config_test.go @@ -33,6 +33,8 @@ import ( "github.com/alibaba/ilogtail/pkg/protocol" "github.com/alibaba/ilogtail/pkg/util" "github.com/alibaba/ilogtail/plugins/extension/basicauth" + _ "github.com/alibaba/ilogtail/plugins/extension/request_breaker" + _ "github.com/alibaba/ilogtail/plugins/flusher/http" "github.com/alibaba/ilogtail/plugins/input" "github.com/alibaba/ilogtail/plugins/processor/regex" ) @@ -360,6 +362,123 @@ func (s *logstoreConfigTestSuite) TestGetExtension() { s.NotEqual(ext, ext2) } +func (s *logstoreConfigTestSuite) TestGetExtensionMultiAnonymousExtensions() { + jsonStr := ` + { + "inputs": [ + { + "type": "service_mock", + "detail": { + "LogsPerSecond": 100, + "Fields": { + "content": "Active connections: 1\nserver accepts handled requests\n 6079 6079 11596\n Reading: 0 Writing: 1 Waiting: 0" + } + } + } + ], + "processors": [ + { + "type": "processor_regex", + "detail": { + "SourceKey": "content", + "Regex": "Active connections: (\\d+)\\s+server accepts handled requests\\s+(\\d+)\\s+(\\d+)\\s+(\\d+)\\s+Reading: (\\d+) Writing: (\\d+) Waiting: (\\d+).*", + "Keys": [ + "connection", + "accepts", + "handled", + "requests", + "reading", + "writing", + "waiting" + ], + "FullMatch": true, + "NoKeyError": true, + "NoMatchError": true, + "KeepSource": true + } + } + ], + "aggregators": [ + { + "type": "aggregator_default" + } + ], + "flushers": [ + { + "type": "flusher_http", + "detail": { + "RemoteURL" : "http://test2.com/write", + "RequestInterceptors" : + [ + { + "Type" : "ext_request_breaker" + }, + { + "Type" : "ext_request_breaker" + } + ] + } + }, + { + "type": "flusher_http", + "detail": { + "RemoteURL" : "http://test2.com/write", + "RequestInterceptors" : + [ + { + "Type" : "ext_request_breaker" + }, + { + "Type" : "ext_request_breaker" + } + ] + } + } + ], + "extensions": [ + { + "type": "ext_basicauth/basicauth_user1", + "detail": { + "Username": "user1", + "Password": "pwd1" + } + } + ] + } +` + + s.NoError(LoadAndStartMockConfig("project", "logstore", "test", jsonStr)) + s.Equal(len(LogtailConfig), 1) + config := LogtailConfig["test"] + s.Equal(config.ProjectName, "project") + s.Equal(config.LogstoreName, "logstore") + s.Equal(config.ConfigName, "test") + s.Equal(config.LogstoreKey, int64(666)) + s.Equal(len(config.PluginRunner.(*pluginv1Runner).MetricPlugins), 0) + s.Equal(len(config.PluginRunner.(*pluginv1Runner).ServicePlugins), 1) + s.Equal(len(config.PluginRunner.(*pluginv1Runner).ProcessorPlugins), 1) + s.Equal(len(config.PluginRunner.(*pluginv1Runner).AggregatorPlugins), 1) + s.Equal(len(config.PluginRunner.(*pluginv1Runner).FlusherPlugins), 2) + s.Equal(len(config.PluginRunner.(*pluginv1Runner).ExtensionPlugins), 5) // should have 5 extentsion + // global config + s.Equal(config.GlobalConfig, &global_config.LoongcollectorGlobalConfig) + + // check plugin inner info + _, ok := config.PluginRunner.(*pluginv1Runner).ProcessorPlugins[0].Processor.(*regex.ProcessorRegex) + s.True(ok) + _, ok = config.PluginRunner.(*pluginv1Runner).ExtensionPlugins["ext_basicauth/basicauth_user1"].(*basicauth.ExtensionBasicAuth) + s.True(ok) + + ext, err := config.Context.GetExtension("ext_basicauth/basicauth_user1", nil) + s.Nil(err) + s.NotNil(ext) + + ext2, err := config.Context.GetExtension("ext_basicauth", map[string]interface{}{"Username": "user2", "Password": "pwd2"}) + s.Nil(err) + s.NotNil(ext) + s.NotEqual(ext, ext2) +} + func Test_hasDockerStdoutInput(t *testing.T) { { plugins := map[string]interface{}{ diff --git a/pluginmanager/plugin_runner_v1.go b/pluginmanager/plugin_runner_v1.go index ed8578873c..fb711e4e1c 100644 --- a/pluginmanager/plugin_runner_v1.go +++ b/pluginmanager/plugin_runner_v1.go @@ -110,6 +110,7 @@ func (p *pluginv1Runner) AddPlugin(pluginMeta *pipeline.PluginMeta, category plu } case pluginExtension: if extension, ok := plugin.(pipeline.Extension); ok { + logger.Debug(p.LogstoreConfig.Context.GetRuntimeContext(), "add extension, key:", pluginMeta.PluginTypeWithID) return p.addExtension(pluginMeta.PluginTypeWithID, extension) } default: @@ -419,7 +420,8 @@ func (p *pluginv1Runner) Stop(exit bool) error { } logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "flusher plugins stop", "done") - for _, extension := range p.ExtensionPlugins { + for pluginTypeWithID, extension := range p.ExtensionPlugins { + logger.Debug(p.LogstoreConfig.Context.GetRuntimeContext(), "stop extension:", pluginTypeWithID, "description", extension.Description()) err := extension.Stop() if err != nil { logger.Warningf(p.LogstoreConfig.Context.GetRuntimeContext(), "STOP_EXTENSION_ALARM", diff --git a/pluginmanager/plugin_runner_v2.go b/pluginmanager/plugin_runner_v2.go index 2db673c426..65ce32becc 100644 --- a/pluginmanager/plugin_runner_v2.go +++ b/pluginmanager/plugin_runner_v2.go @@ -114,6 +114,7 @@ func (p *pluginv2Runner) AddPlugin(pluginMeta *pipeline.PluginMeta, category plu } case pluginExtension: if extension, ok := plugin.(pipeline.Extension); ok { + logger.Debug(p.LogstoreConfig.Context.GetRuntimeContext(), "add extension, key:", pluginMeta.PluginTypeWithID) return p.addExtension(pluginMeta.PluginTypeWithID, extension) } default: @@ -433,7 +434,8 @@ func (p *pluginv2Runner) Stop(exit bool) error { } logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "Flusher plugins stop", "done") - for _, extension := range p.ExtensionPlugins { + for pluginTypeWithID, extension := range p.ExtensionPlugins { + logger.Debug(p.LogstoreConfig.Context.GetRuntimeContext(), "stop extension:", pluginTypeWithID, "description", extension.Description()) err := extension.Stop() if err != nil { logger.Warningf(p.LogstoreConfig.Context.GetRuntimeContext(), "STOP_EXTENSION_ALARM",