-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a memory limiter processor (#498)
This adds a processor that drops data according to configured memory limits. The processor is important for high load situations when receiving rate exceeds exporting rate (and an extreme case of this is when the target of exporting is unavailable). Typical production run will need to have this processor included in every pipeline immediately after the batch processor.
- Loading branch information
1 parent
9778b16
commit 21a70d6
Showing
13 changed files
with
922 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
// Copyright 2019, OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
// Package memorylimiter provides a processor for OpenTelemetry Service pipeline | ||
// that drops data on the pipeline according to the current state of memory | ||
// usage. | ||
package memorylimiter | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector/config/configmodels" | ||
) | ||
|
||
// Config defines configuration for memory memoryLimiter processor. | ||
type Config struct { | ||
configmodels.ProcessorSettings `mapstructure:",squash"` | ||
|
||
// CheckInterval is the time between measurements of memory usage for the | ||
// purposes of avoiding going over the limits. Defaults to zero, so no | ||
// checks will be performed. | ||
CheckInterval time.Duration `mapstructure:"check_interval"` | ||
|
||
// MemoryLimitMiB is the maximum amount of memory, in MiB, targeted to be | ||
// allocated by the process. | ||
MemoryLimitMiB uint32 `mapstructure:"limit_mib"` | ||
|
||
// MemorySpikeLimitMiB is the maximum, in MiB, spike expected between the | ||
// measurements of memory usage. | ||
MemorySpikeLimitMiB uint32 `mapstructure:"spike_limit_mib"` | ||
|
||
// BallastSizeMiB is the size, in MiB, of the ballast size being used by the | ||
// process. | ||
BallastSizeMiB uint32 `mapstructure:"ballast_size_mib"` | ||
} | ||
|
||
// Name of BallastSizeMiB config option. | ||
const ballastSizeMibKey = "ballast_size_mib" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
// Copyright 2019, OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package memorylimiter | ||
|
||
import ( | ||
"path" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector/config" | ||
"github.com/open-telemetry/opentelemetry-collector/config/configmodels" | ||
) | ||
|
||
func TestLoadConfig(t *testing.T) { | ||
factories, err := config.ExampleComponents() | ||
require.NoError(t, err) | ||
factory := &Factory{} | ||
factories.Processors[typeStr] = factory | ||
require.NoError(t, err) | ||
|
||
config, err := config.LoadConfigFile( | ||
t, | ||
path.Join(".", "testdata", "config.yaml"), | ||
factories) | ||
|
||
require.Nil(t, err) | ||
require.NotNil(t, config) | ||
|
||
p0 := config.Processors["memory_limiter"] | ||
assert.Equal(t, p0, | ||
&Config{ | ||
ProcessorSettings: configmodels.ProcessorSettings{ | ||
TypeVal: "memory_limiter", | ||
NameVal: "memory_limiter", | ||
}, | ||
}) | ||
|
||
p1 := config.Processors["memory_limiter/with-settings"] | ||
assert.Equal(t, p1, | ||
&Config{ | ||
ProcessorSettings: configmodels.ProcessorSettings{ | ||
TypeVal: "memory_limiter", | ||
NameVal: "memory_limiter/with-settings", | ||
}, | ||
CheckInterval: 5 * time.Second, | ||
MemoryLimitMiB: 4000, | ||
MemorySpikeLimitMiB: 500, | ||
BallastSizeMiB: 2000, | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
// Copyright 2019, OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package memorylimiter | ||
|
||
import ( | ||
"go.uber.org/zap" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector/config/configmodels" | ||
"github.com/open-telemetry/opentelemetry-collector/consumer" | ||
"github.com/open-telemetry/opentelemetry-collector/processor" | ||
) | ||
|
||
const ( | ||
// The value of "type" Attribute Key in configuration. | ||
typeStr = "memory_limiter" | ||
) | ||
|
||
// Factory is the factory for Attribute Key processor. | ||
type Factory struct { | ||
} | ||
|
||
// Type gets the type of the config created by this factory. | ||
func (f *Factory) Type() string { | ||
return typeStr | ||
} | ||
|
||
// CreateDefaultConfig creates the default configuration for processor. Notice | ||
// that the default configuration is expected to fail for this processor. | ||
func (f *Factory) CreateDefaultConfig() configmodels.Processor { | ||
return &Config{ | ||
ProcessorSettings: configmodels.ProcessorSettings{ | ||
TypeVal: typeStr, | ||
NameVal: typeStr, | ||
}, | ||
} | ||
} | ||
|
||
// CreateTraceProcessor creates a trace processor based on this config. | ||
func (f *Factory) CreateTraceProcessor( | ||
logger *zap.Logger, | ||
nextConsumer consumer.TraceConsumer, | ||
cfg configmodels.Processor, | ||
) (processor.TraceProcessor, error) { | ||
return f.createProcessor(logger, nextConsumer, nil, cfg) | ||
} | ||
|
||
// CreateMetricsProcessor creates a metrics processor based on this config. | ||
func (f *Factory) CreateMetricsProcessor( | ||
logger *zap.Logger, | ||
nextConsumer consumer.MetricsConsumer, | ||
cfg configmodels.Processor, | ||
) (processor.MetricsProcessor, error) { | ||
return f.createProcessor(logger, nil, nextConsumer, cfg) | ||
} | ||
|
||
func (f *Factory) createProcessor( | ||
logger *zap.Logger, | ||
traceConsumer consumer.TraceConsumer, | ||
metricConsumer consumer.MetricsConsumer, | ||
cfg configmodels.Processor, | ||
) (processor.DualTypeProcessor, error) { | ||
const mibBytes = 1024 * 1024 | ||
pCfg := cfg.(*Config) | ||
return New( | ||
cfg.Name(), | ||
traceConsumer, | ||
metricConsumer, | ||
pCfg.CheckInterval, | ||
uint64(pCfg.MemoryLimitMiB)*mibBytes, | ||
uint64(pCfg.MemorySpikeLimitMiB)*mibBytes, | ||
uint64(pCfg.BallastSizeMiB)*mibBytes, | ||
logger, | ||
) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
// Copyright 2019, OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package memorylimiter | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"go.uber.org/zap" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector/config/configcheck" | ||
"github.com/open-telemetry/opentelemetry-collector/exporter/exportertest" | ||
) | ||
|
||
func TestCreateDefaultConfig(t *testing.T) { | ||
factory := &Factory{} | ||
require.NotNil(t, factory) | ||
|
||
cfg := factory.CreateDefaultConfig() | ||
assert.NotNil(t, cfg, "failed to create default config") | ||
assert.NoError(t, configcheck.ValidateConfig(cfg)) | ||
} | ||
|
||
func TestCreateProcessor(t *testing.T) { | ||
factory := &Factory{} | ||
require.NotNil(t, factory) | ||
|
||
cfg := factory.CreateDefaultConfig() | ||
|
||
// This processor can't be created with the default config. | ||
tp, err := factory.CreateTraceProcessor(zap.NewNop(), exportertest.NewNopTraceExporter(), cfg) | ||
assert.Nil(t, tp) | ||
assert.Error(t, err, "created processor with invalid settings") | ||
|
||
mp, err := factory.CreateMetricsProcessor(zap.NewNop(), exportertest.NewNopMetricsExporter(), cfg) | ||
assert.Nil(t, mp) | ||
assert.Error(t, err, "created processor with invalid settings") | ||
|
||
// Create processor with a valid config. | ||
pCfg := cfg.(*Config) | ||
pCfg.MemoryLimitMiB = 5722 | ||
pCfg.MemorySpikeLimitMiB = 1907 | ||
pCfg.BallastSizeMiB = 2048 | ||
pCfg.CheckInterval = 100 * time.Millisecond | ||
|
||
tp, err = factory.CreateTraceProcessor(zap.NewNop(), exportertest.NewNopTraceExporter(), cfg) | ||
assert.NoError(t, err) | ||
assert.NotNil(t, tp) | ||
assert.NoError(t, tp.Shutdown()) | ||
|
||
mp, err = factory.CreateMetricsProcessor(zap.NewNop(), exportertest.NewNopMetricsExporter(), cfg) | ||
assert.NoError(t, err) | ||
assert.NotNil(t, mp) | ||
assert.NoError(t, mp.Shutdown()) | ||
} |
Oops, something went wrong.