From a0669d2d9c96ee28709a88fe29a502e325675c90 Mon Sep 17 00:00:00 2001 From: Alex K <8418476+fearful-symmetry@users.noreply.github.com> Date: Fri, 27 Oct 2023 11:26:42 -0700 Subject: [PATCH] Fix pipeline setup under serverless (#36973) * fix pipeline setup under serverless * add docs, fix errors * fix fix linter --- filebeat/fileset/pipelines.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index 07d6c4c21ac3..c58a7c13edcf 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -37,6 +37,7 @@ type PipelineLoader interface { LoadJSON(path string, json map[string]interface{}) ([]byte, error) Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error) GetVersion() version.V + IsServerless() bool } // MultiplePipelineUnsupportedError is an error returned when a fileset uses multiple pipelines but is @@ -65,16 +66,17 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool // check that all the required Ingest Node plugins are available requiredProcessors := fileset.GetRequiredProcessors() reg.log.Debugf("Required processors: %s", requiredProcessors) - if len(requiredProcessors) > 0 { + // APIs do not exist on serverless + if len(requiredProcessors) > 0 && !esClient.IsServerless() { err := checkAvailableProcessors(esClient, requiredProcessors) if err != nil { - return fmt.Errorf("error loading pipeline for fileset %s/%s: %v", module.config.Module, fileset.name, err) + return fmt.Errorf("error loading pipeline for fileset %s/%s: %w", module.config.Module, fileset.name, err) } } pipelines, err := fileset.GetPipelines(esClient.GetVersion()) if err != nil { - return fmt.Errorf("error getting pipeline for fileset %s/%s: %v", module.config.Module, fileset.name, err) + return fmt.Errorf("error getting pipeline for fileset %s/%s: %w", module.config.Module, fileset.name, err) } // Filesets with multiple pipelines can only be supported by Elasticsearch >= 6.5.0 @@ -88,7 +90,7 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool for _, pipeline := range pipelines { err = LoadPipeline(esClient, pipeline.id, pipeline.contents, overwrite, reg.log.With("pipeline", pipeline.id)) if err != nil { - err = fmt.Errorf("error loading pipeline for fileset %s/%s: %v", module.config.Module, fileset.name, err) + err = fmt.Errorf("error loading pipeline for fileset %s/%s: %w", module.config.Module, fileset.name, err) break } pipelineIDsLoaded = append(pipelineIDsLoaded, pipeline.id) @@ -169,7 +171,7 @@ func interpretError(initialErr error, body []byte) error { "This is the response I got from Elasticsearch: %s", body) } - return fmt.Errorf("couldn't load pipeline: %v. Additionally, error decoding response body: %s", + return fmt.Errorf("couldn't load pipeline: %w. Additionally, error decoding response body: %s", initialErr, body) } @@ -194,5 +196,5 @@ func interpretError(initialErr error, body []byte) error { "This is the response I got from Elasticsearch: %s", body) } - return fmt.Errorf("couldn't load pipeline: %v. Response body: %s", initialErr, body) + return fmt.Errorf("couldn't load pipeline: %w. Response body: %s", initialErr, body) }