Skip to content

Commit

Permalink
[pkg/stanza] make log emitter and entry conversion in adapter synchro…
Browse files Browse the repository at this point in the history
…nous (#35669)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

This PR changes the `LogEmitter` to accept a synchronous consumer
callback function for processing a batch of log entries as an
alternative to sending log entry batches to a channel.
The components that use the `LogEmitter` (adapter and parser) have been
adapted accordingly. Also, in the case of the adapter, the log entries
are converted directly, rather than sending them over a channel to the
converter and receiving the converted results over a different channel.

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Fixes #35453 

<!--Describe what testing was performed and which tests were added.-->
#### Testing

I did some initial performance tests using the `TestLogLargeFiles` load
test to see how this change affects the performance. Below are the
results:

**Before the change (i.e. with async log entry batch processing)**

```
=== RUN   TestLogLargeFiles/filelog-largefiles-2Gb-lifetime
2024/10/08 09:02:53  | Sent:17,769,795 logs (179,507/sec) | Received:17,755,188 items (179,346/sec)

=== RUN   TestLogLargeFiles/filelog-largefiles-6GB-lifetime
2024/10/08 09:06:29  | Sent:42,857,755 logs (216,465/sec) | Received:42,851,987 items (216,424/sec)

Test                                         |Result|Duration|CPU Avg%|CPU Max%|RAM Avg MiB|RAM Max MiB|Sent Items|Received Items|
---------------------------------------------|------|-------:|-------:|-------:|----------:|----------:|---------:|-------------:|
LogLargeFiles/filelog-largefiles-2Gb-lifetime|PASS  |    100s|    73.1|    78.4|        106|        118|  18249451|      18249451|
LogLargeFiles/filelog-largefiles-6GB-lifetime|PASS  |    200s|    87.5|    98.1|        110|        116|  44358460|      44358460|
```

**After the change (i.e. with sync log entry batch processing)**

```
=== RUN   TestLogLargeFiles/filelog-largefiles-2Gb-lifetime
2024/10/08 10:09:51 Agent RAM (RES): 139 MiB, CPU:71.7% | Sent:17,802,561 logs (179,836/sec) | Received:17,788,273 items (179,680/sec)

=== RUN   TestLogLargeFiles/filelog-largefiles-6GB-lifetime
2024/10/08 10:05:15 Agent RAM (RES): 140 MiB, CPU:95.6% | Sent:42,912,030 logs (216,744/sec) | Received:42,904,306 items (216,689/sec)

Test                                         |Result|Duration|CPU Avg%|CPU Max%|RAM Avg MiB|RAM Max MiB|Sent Items|Received Items|
---------------------------------------------|------|-------:|-------:|-------:|----------:|----------:|---------:|-------------:|
LogLargeFiles/filelog-largefiles-2Gb-lifetime|PASS  |    100s|    74.8|    78.9|        127|        139|  17984687|      17984687|
LogLargeFiles/filelog-largefiles-6GB-lifetime|PASS  |    200s|    89.3|   100.9|        134|        140|  43376210|      43376210|
```

Those results seem to indicate comparable throughput, but with an
increased resource consumption, especially in terms of memory.

I also did a test comparing the performance between the synchronous and
asynchronous log emitter using the same methodology as in #35454.

The results were the following, and indicate an increase in the time it
takes for reading the generated log file (see #35454 for details on how
the file is generated and the test execution):

- Async Log Emitter: ~8s
- Sync Log Emitter: ~12s


<details>
  <summary>output-async.log</summary>
=== Step 3: Thu Oct 10 08:54:23 CEST 2024

otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="3744d4cb-5080-427c-8c16-a96cb40a57d4",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""}
2.209674e+06
=== Step 4: Thu Oct 10 08:54:25 CEST 2024

otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="3744d4cb-5080-427c-8c16-a96cb40a57d4",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""}
5.428103e+06
=== Step 5: Thu Oct 10 08:54:26 CEST 2024

otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="3744d4cb-5080-427c-8c16-a96cb40a57d4",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""}
7.337017e+06
=== Step 6: Thu Oct 10 08:54:27 CEST 2024

otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="3744d4cb-5080-427c-8c16-a96cb40a57d4",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""}
9.258843e+06
=== Step 7: Thu Oct 10 08:54:29 CEST 2024

otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="3744d4cb-5080-427c-8c16-a96cb40a57d4",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""}
1.3082428e+07
=== Step 8: Thu Oct 10 08:54:31 CEST 2024

otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="3744d4cb-5080-427c-8c16-a96cb40a57d4",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""}
1.6519068e+07
</details>

<details>
  <summary>output-sync.log</summary>
=== Step 2: Thu Oct 10 08:51:27 CEST 2024

otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""}
1.580891e+06
=== Step 3: Thu Oct 10 08:51:28 CEST 2024

otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""}
3.01034e+06
=== Step 4: Thu Oct 10 08:51:29 CEST 2024

otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""}
4.434627e+06
=== Step 5: Thu Oct 10 08:51:31 CEST 2024

otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""}
7.416612e+06
=== Step 6: Thu Oct 10 08:51:34 CEST 2024

otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""}
1.0496072e+07
=== Step 7: Thu Oct 10 08:51:36 CEST 2024

otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""}
1.3523882e+07
=== Step 8: Thu Oct 10 08:51:37 CEST 2024

otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""}
1.4929707e+07
=== Step 9: Thu Oct 10 08:51:39 CEST 2024

otelcol_receiver_accepted_log_records{receiver="filelog",service_instance_id="dcf5371b-19eb-47b3-a820-756c1832b448",service_name="otelcontribcol",service_version="0.111.0-dev",transport=""}
1.6519105e+07
</details>

---------

Signed-off-by: Florian Bacher <[email protected]>
Co-authored-by: Andrzej Stencel <[email protected]>
  • Loading branch information
bacherfl and andrzej-stencel authored Nov 7, 2024
1 parent 2b58a15 commit c840e69
Show file tree
Hide file tree
Showing 12 changed files with 283 additions and 377 deletions.
27 changes: 27 additions & 0 deletions .chloggen/stanza-sync-log-emitter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Synchronous handling of entries passed from the log emitter to the receiver adapter

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35453]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
54 changes: 28 additions & 26 deletions pkg/stanza/adapter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,44 +155,46 @@ func (c *Converter) workerLoop() {
defer c.wg.Done()

for entries := range c.workerChan {
// Send plogs directly to flushChan
c.flushChan <- ConvertEntries(entries)
}
}

resourceHashToIdx := make(map[uint64]int)
scopeIdxByResource := make(map[uint64]map[string]int)
func ConvertEntries(entries []*entry.Entry) plog.Logs {
resourceHashToIdx := make(map[uint64]int)
scopeIdxByResource := make(map[uint64]map[string]int)

pLogs := plog.NewLogs()
var sl plog.ScopeLogs
pLogs := plog.NewLogs()
var sl plog.ScopeLogs

for _, e := range entries {
resourceID := HashResource(e.Resource)
var rl plog.ResourceLogs
for _, e := range entries {
resourceID := HashResource(e.Resource)
var rl plog.ResourceLogs

resourceIdx, ok := resourceHashToIdx[resourceID]
if !ok {
resourceHashToIdx[resourceID] = pLogs.ResourceLogs().Len()
resourceIdx, ok := resourceHashToIdx[resourceID]
if !ok {
resourceHashToIdx[resourceID] = pLogs.ResourceLogs().Len()

rl = pLogs.ResourceLogs().AppendEmpty()
upsertToMap(e.Resource, rl.Resource().Attributes())
rl = pLogs.ResourceLogs().AppendEmpty()
upsertToMap(e.Resource, rl.Resource().Attributes())

scopeIdxByResource[resourceID] = map[string]int{e.ScopeName: 0}
scopeIdxByResource[resourceID] = map[string]int{e.ScopeName: 0}
sl = rl.ScopeLogs().AppendEmpty()
sl.Scope().SetName(e.ScopeName)
} else {
rl = pLogs.ResourceLogs().At(resourceIdx)
scopeIdxInResource, ok := scopeIdxByResource[resourceID][e.ScopeName]
if !ok {
scopeIdxByResource[resourceID][e.ScopeName] = rl.ScopeLogs().Len()
sl = rl.ScopeLogs().AppendEmpty()
sl.Scope().SetName(e.ScopeName)
} else {
rl = pLogs.ResourceLogs().At(resourceIdx)
scopeIdxInResource, ok := scopeIdxByResource[resourceID][e.ScopeName]
if !ok {
scopeIdxByResource[resourceID][e.ScopeName] = rl.ScopeLogs().Len()
sl = rl.ScopeLogs().AppendEmpty()
sl.Scope().SetName(e.ScopeName)
} else {
sl = pLogs.ResourceLogs().At(resourceIdx).ScopeLogs().At(scopeIdxInResource)
}
sl = pLogs.ResourceLogs().At(resourceIdx).ScopeLogs().At(scopeIdxInResource)
}
convertInto(e, sl.LogRecords().AppendEmpty())
}

// Send plogs directly to flushChan
c.flushChan <- pLogs
convertInto(e, sl.LogRecords().AppendEmpty())
}
return pLogs
}

func (c *Converter) flushLoop() {
Expand Down
44 changes: 21 additions & 23 deletions pkg/stanza/adapter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,30 @@ func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc {

operators := append([]operator.Config{inputCfg}, baseCfg.Operators...)

obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: params.ID,
ReceiverCreateSettings: params,
})
if err != nil {
return nil, err
}
rcv := &receiver{
set: params.TelemetrySettings,
id: params.ID,
consumer: consumerretry.NewLogs(baseCfg.RetryOnFailure, params.Logger, nextConsumer),
obsrecv: obsrecv,
storageID: baseCfg.StorageID,
}

var emitterOpts []helper.EmitterOption
if baseCfg.maxBatchSize > 0 {
emitterOpts = append(emitterOpts, helper.WithMaxBatchSize(baseCfg.maxBatchSize))
}
if baseCfg.flushInterval > 0 {
emitterOpts = append(emitterOpts, helper.WithFlushInterval(baseCfg.flushInterval))
}
emitter := helper.NewLogEmitter(params.TelemetrySettings, emitterOpts...)

emitter := helper.NewLogEmitter(params.TelemetrySettings, rcv.consumeEntries, emitterOpts...)
pipe, err := pipeline.Config{
Operators: operators,
DefaultOutput: emitter,
Expand All @@ -62,27 +78,9 @@ func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc {
return nil, err
}

var converterOpts []converterOption
if baseCfg.numWorkers > 0 {
converterOpts = append(converterOpts, withWorkerCount(baseCfg.numWorkers))
}
converter := NewConverter(params.TelemetrySettings, converterOpts...)
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: params.ID,
ReceiverCreateSettings: params,
})
if err != nil {
return nil, err
}
return &receiver{
set: params.TelemetrySettings,
id: params.ID,
pipe: pipe,
emitter: emitter,
consumer: consumerretry.NewLogs(baseCfg.RetryOnFailure, params.Logger, nextConsumer),
converter: converter,
obsrecv: obsrecv,
storageID: baseCfg.StorageID,
}, nil
rcv.emitter = emitter
rcv.pipe = pipe

return rcv, nil
}
}
23 changes: 13 additions & 10 deletions pkg/stanza/adapter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
func createNoopReceiver(nextConsumer consumer.Logs) (*receiver, error) {
set := componenttest.NewNopTelemetrySettings()
set.Logger = zap.NewNop()
emitter := helper.NewLogEmitter(set)

pipe, err := pipeline.Config{
Operators: []operator.Config{
{
Expand All @@ -48,15 +48,18 @@ func createNoopReceiver(nextConsumer consumer.Logs) (*receiver, error) {
return nil, err
}

return &receiver{
set: set,
id: component.MustNewID("testReceiver"),
pipe: pipe,
emitter: emitter,
consumer: nextConsumer,
converter: NewConverter(componenttest.NewNopTelemetrySettings()),
obsrecv: obsrecv,
}, nil
rcv := &receiver{
set: set,
id: component.MustNewID("testReceiver"),
pipe: pipe,
consumer: nextConsumer,
obsrecv: obsrecv,
}

emitter := helper.NewLogEmitter(set, rcv.consumeEntries)

rcv.emitter = emitter
return rcv, nil
}

// BenchmarkEmitterToConsumer serves as a benchmark for entries going from the emitter to consumer,
Expand Down
98 changes: 16 additions & 82 deletions pkg/stanza/adapter/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package adapter // import "github.com/open-telemetry/opentelemetry-collector-con
import (
"context"
"fmt"
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand All @@ -16,22 +15,19 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline"
)

type receiver struct {
set component.TelemetrySettings
id component.ID
emitWg sync.WaitGroup
consumeWg sync.WaitGroup
cancel context.CancelFunc

pipe pipeline.Pipeline
emitter *helper.LogEmitter
consumer consumer.Logs
converter *Converter
obsrecv *receiverhelper.ObsReport
set component.TelemetrySettings
id component.ID

pipe pipeline.Pipeline
emitter *helper.LogEmitter
consumer consumer.Logs
obsrecv *receiverhelper.ObsReport

storageID *component.ID
storageClient storage.Client
Expand All @@ -42,8 +38,6 @@ var _ rcvr.Logs = (*receiver)(nil)

// Start tells the receiver to start
func (r *receiver) Start(ctx context.Context, host component.Host) error {
rctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
r.set.Logger.Info("Starting stanza receiver")

if err := r.setStorageClient(ctx, host); err != nil {
Expand All @@ -54,86 +48,26 @@ func (r *receiver) Start(ctx context.Context, host component.Host) error {
return fmt.Errorf("start stanza: %w", err)
}

r.converter.Start()

// Below we're starting 2 loops:
// * one which reads all the logs produced by the emitter and then forwards
// them to converter
// ...
r.emitWg.Add(1)
go r.emitterLoop()

// ...
// * second one which reads all the logs produced by the converter
// (aggregated by Resource) and then calls consumer to consume them.
r.consumeWg.Add(1)
go r.consumerLoop(rctx)

// Those 2 loops are started in separate goroutines because batching in
// the emitter loop can cause a flush, caused by either reaching the max
// flush size or by the configurable ticker which would in turn cause
// a set of log entries to be available for reading in converter's out
// channel. In order to prevent backpressure, reading from the converter
// channel and batching are done in those 2 goroutines.

return nil
}

// emitterLoop reads the log entries produced by the emitter and batches them
// in converter.
func (r *receiver) emitterLoop() {
defer r.emitWg.Done()

// Don't create done channel on every iteration.
// emitter.OutChannel is closed on ctx.Done(), no need to handle ctx here
// instead we should drain and process the channel to let emitter cancel properly
for e := range r.emitter.OutChannel() {
if err := r.converter.Batch(e); err != nil {
r.set.Logger.Error("Could not add entry to batch", zap.Error(err))
}
}
func (r *receiver) consumeEntries(ctx context.Context, entries []*entry.Entry) {
obsrecvCtx := r.obsrecv.StartLogsOp(ctx)
pLogs := ConvertEntries(entries)
logRecordCount := pLogs.LogRecordCount()

r.set.Logger.Debug("Emitter loop stopped")
}

// consumerLoop reads converter log entries and calls the consumer to consumer them.
func (r *receiver) consumerLoop(ctx context.Context) {
defer r.consumeWg.Done()

// Don't create done channel on every iteration.
// converter.OutChannel is closed on Shutdown before context is cancelled.
// Drain the channel and process events before exiting
for pLogs := range r.converter.OutChannel() {
obsrecvCtx := r.obsrecv.StartLogsOp(ctx)
logRecordCount := pLogs.LogRecordCount()

cErr := r.consumer.ConsumeLogs(ctx, pLogs)
if cErr != nil {
r.set.Logger.Error("ConsumeLogs() failed", zap.Error(cErr))
}
r.obsrecv.EndLogsOp(obsrecvCtx, "stanza", logRecordCount, cErr)
cErr := r.consumer.ConsumeLogs(ctx, pLogs)
if cErr != nil {
r.set.Logger.Error("ConsumeLogs() failed", zap.Error(cErr))
}

r.set.Logger.Debug("Consumer loop stopped")
r.obsrecv.EndLogsOp(obsrecvCtx, "stanza", logRecordCount, cErr)
}

// Shutdown is invoked during service shutdown
func (r *receiver) Shutdown(ctx context.Context) error {
if r.cancel == nil {
return nil
}

r.set.Logger.Info("Stopping stanza receiver")
pipelineErr := r.pipe.Stop()

// wait for emitter to finish batching and let consumers catch up
r.emitWg.Wait()

r.converter.Stop()
r.cancel()
// wait for consumers to catch up
r.consumeWg.Wait()

if r.storageClient != nil {
clientErr := r.storageClient.Close(ctx)
return multierr.Combine(pipelineErr, clientErr)
Expand Down
Loading

0 comments on commit c840e69

Please sign in to comment.