Skip to content

Commit

Permalink
Fix pipeline setup under serverless (#36973)
Browse files Browse the repository at this point in the history
* fix pipeline setup under serverless

* add docs, fix errors

* fix fix linter
  • Loading branch information
fearful-symmetry authored Oct 27, 2023
1 parent ae11b47 commit a0669d2
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions filebeat/fileset/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type PipelineLoader interface {
LoadJSON(path string, json map[string]interface{}) ([]byte, error)
Request(method, path string, pipeline string, params map[string]string, body interface{}) (int, []byte, error)
GetVersion() version.V
IsServerless() bool
}

// MultiplePipelineUnsupportedError is an error returned when a fileset uses multiple pipelines but is
Expand Down Expand Up @@ -65,16 +66,17 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool
// check that all the required Ingest Node plugins are available
requiredProcessors := fileset.GetRequiredProcessors()
reg.log.Debugf("Required processors: %s", requiredProcessors)
if len(requiredProcessors) > 0 {
// APIs do not exist on serverless
if len(requiredProcessors) > 0 && !esClient.IsServerless() {
err := checkAvailableProcessors(esClient, requiredProcessors)
if err != nil {
return fmt.Errorf("error loading pipeline for fileset %s/%s: %v", module.config.Module, fileset.name, err)
return fmt.Errorf("error loading pipeline for fileset %s/%s: %w", module.config.Module, fileset.name, err)
}
}

pipelines, err := fileset.GetPipelines(esClient.GetVersion())
if err != nil {
return fmt.Errorf("error getting pipeline for fileset %s/%s: %v", module.config.Module, fileset.name, err)
return fmt.Errorf("error getting pipeline for fileset %s/%s: %w", module.config.Module, fileset.name, err)
}

// Filesets with multiple pipelines can only be supported by Elasticsearch >= 6.5.0
Expand All @@ -88,7 +90,7 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool
for _, pipeline := range pipelines {
err = LoadPipeline(esClient, pipeline.id, pipeline.contents, overwrite, reg.log.With("pipeline", pipeline.id))
if err != nil {
err = fmt.Errorf("error loading pipeline for fileset %s/%s: %v", module.config.Module, fileset.name, err)
err = fmt.Errorf("error loading pipeline for fileset %s/%s: %w", module.config.Module, fileset.name, err)
break
}
pipelineIDsLoaded = append(pipelineIDsLoaded, pipeline.id)
Expand Down Expand Up @@ -169,7 +171,7 @@ func interpretError(initialErr error, body []byte) error {
"This is the response I got from Elasticsearch: %s", body)
}

return fmt.Errorf("couldn't load pipeline: %v. Additionally, error decoding response body: %s",
return fmt.Errorf("couldn't load pipeline: %w. Additionally, error decoding response body: %s",
initialErr, body)
}

Expand All @@ -194,5 +196,5 @@ func interpretError(initialErr error, body []byte) error {
"This is the response I got from Elasticsearch: %s", body)
}

return fmt.Errorf("couldn't load pipeline: %v. Response body: %s", initialErr, body)
return fmt.Errorf("couldn't load pipeline: %w. Response body: %s", initialErr, body)
}

0 comments on commit a0669d2

Please sign in to comment.