Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(concurrentbatchprocessor): add in-flight byte limit to producer queue #93

Merged
Merged
11 changes: 0 additions & 11 deletions collector/cmd/otelarrowcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,3 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)











2 changes: 0 additions & 2 deletions collector/cmd/otelarrowcol/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,6 @@ github.com/labstack/gommon v0.3.1/go.mod h1:uW6kP17uPlLJsD3ijUYn3/M5bAxtlZhMI6m3
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lightstep/telemetry-generator/generatorreceiver v0.15.0 h1:gYEMyJHTSczSIRbkiVYQDH1ScQxyQKNgXJG3WarmtOE=
github.com/lightstep/telemetry-generator/generatorreceiver v0.15.0/go.mod h1:pXv7/nt9MWXKio5S2deXbgq0q8JEKvm8IWJTLpNolqQ=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
Expand Down
101 changes: 73 additions & 28 deletions collector/processor/concurrentbatchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -45,6 +46,7 @@ type batchProcessor struct {
timeout time.Duration
sendBatchSize int
sendBatchMaxSize int
maxInFlightBytes int

// batchFunc is a factory for new batch objects corresponding
// with the appropriate signal.
Expand Down Expand Up @@ -91,6 +93,7 @@ type shard struct {
// newItem is used to receive data items from producers.
newItem chan dataItem

sem *semaphore.Weighted
// batch is an in-flight data item containing one of the
// underlying data types.
batch batch
Expand All @@ -115,13 +118,15 @@ type dataItem struct {
type batch interface {
// export the current batch
export(ctx context.Context, req any) error
splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (sentBatchSize int, bytes int, req any)
splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (sentBatchSize int, req any)

// itemCount returns the size of the current batch
itemCount() int

// add item to the current batch
add(item any)

sizeBytes(data any) int
}

// countedError is useful when a producer adds items that are split
Expand Down Expand Up @@ -161,6 +166,7 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func
shutdownC: make(chan struct{}, 1),
metadataKeys: mks,
metadataLimit: int(cfg.MetadataCardinalityLimit),
maxInFlightBytes: int(cfg.MaxInFlightBytes),
}
if len(bp.metadataKeys) == 0 {
bp.batcher = &singleShardBatcher{batcher: bp.newShard(nil)}
Expand Down Expand Up @@ -189,6 +195,7 @@ func (bp *batchProcessor) newShard(md map[string][]string) *shard {
newItem: make(chan dataItem, runtime.NumCPU()),
exportCtx: exportCtx,
batch: bp.batchFunc(),
sem: semaphore.NewWeighted(int64(bp.maxInFlightBytes)),
}

b.processor.goroutines.Add(1)
Expand Down Expand Up @@ -302,7 +309,8 @@ func (b *shard) resetTimer() {
}

func (b *shard) sendItems(trigger trigger) {
sent, bytes, req := b.batch.splitBatch(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed)
sent, req := b.batch.splitBatch(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed)
bytes := int64(b.batch.sizeBytes(req))

var waiters []chan error
var countItems []int
Expand Down Expand Up @@ -344,7 +352,7 @@ func (b *shard) sendItems(trigger trigger) {
if err != nil {
b.processor.logger.Warn("Sender failed", zap.Error(err))
} else {
b.processor.telemetry.record(trigger, int64(sent), int64(bytes))
b.processor.telemetry.record(trigger, int64(sent), bytes)
}
}()

Expand All @@ -354,8 +362,6 @@ func (b *shard) sendItems(trigger trigger) {

func (b *shard) consumeAndWait(ctx context.Context, data any) error {
respCh := make(chan error, 1)
// TODO: add a semaphore to only write to channel if sizeof(data) keeps
// us below some configured inflight byte limit.
item := dataItem{
data: data,
responseCh: respCh,
Expand All @@ -370,12 +376,42 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error {
item.count = telem.LogRecordCount()
}

bytes := int64(b.batch.sizeBytes(data))
err := b.sem.Acquire(ctx, bytes)
if err != nil {
return err
}

// The purpose of this function is to ensure semaphore
// releases all previously acquired bytes
defer func() {
if item.count == 0 {
b.sem.Release(bytes)
return
}

// context may have timed out before we received all
// responses. Start goroutine to wait and release
// all acquired bytes after the parent thread returns.
go func() {
for newErr := range respCh {
unwrap := newErr.(countedError)

item.count -= unwrap.count
if item.count != 0 {
continue
}
break
}
b.sem.Release(bytes)
}()
}()

select {
case <-ctx.Done():
return ctx.Err()
case b.newItem <- item:
}
var err error

for {
select {
Expand Down Expand Up @@ -506,6 +542,12 @@ func newBatchLogsProcessor(set processor.CreateSettings, next consumer.Logs, cfg
return newBatchProcessor(set, cfg, func() batch { return newBatchLogs(next) }, useOtel)
}

func recoverError(retErr *error) {
if r := recover(); r != nil {
*retErr = fmt.Errorf("%v", r)
}
}

type batchTraces struct {
nextConsumer consumer.Traces
traceData ptrace.Traces
Expand All @@ -530,15 +572,19 @@ func (bt *batchTraces) add(item any) {
td.ResourceSpans().MoveAndAppendTo(bt.traceData.ResourceSpans())
}

func (bt *batchTraces) export(ctx context.Context, req any) error {
func (bt *batchTraces) sizeBytes(data any) int {
return bt.sizer.TracesSize(data.(ptrace.Traces))
}

func (bt *batchTraces) export(ctx context.Context, req any) (retErr error) {
defer recoverError(&retErr)
td := req.(ptrace.Traces)
return bt.nextConsumer.ConsumeTraces(ctx, td)
}

func (bt *batchTraces) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, any) {
func (bt *batchTraces) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, any) {
var req ptrace.Traces
var sent int
var bytes int
if sendBatchMaxSize > 0 && bt.itemCount() > sendBatchMaxSize {
req = splitTraces(sendBatchMaxSize, bt.traceData)
bt.spanCount -= sendBatchMaxSize
Expand All @@ -549,10 +595,7 @@ func (bt *batchTraces) splitBatch(ctx context.Context, sendBatchMaxSize int, ret
bt.traceData = ptrace.NewTraces()
bt.spanCount = 0
}
if returnBytes {
bytes = bt.sizer.TracesSize(req)
}
return sent, bytes, req
return sent, req
}

func (bt *batchTraces) itemCount() int {
Expand All @@ -570,15 +613,19 @@ func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics {
return &batchMetrics{nextConsumer: nextConsumer, metricData: pmetric.NewMetrics(), sizer: &pmetric.ProtoMarshaler{}}
}

func (bm *batchMetrics) export(ctx context.Context, req any) error {
func (bm *batchMetrics) sizeBytes(data any) int {
return bm.sizer.MetricsSize(data.(pmetric.Metrics))
}

func (bm *batchMetrics) export(ctx context.Context, req any) (retErr error) {
defer recoverError(&retErr)
md := req.(pmetric.Metrics)
return bm.nextConsumer.ConsumeMetrics(ctx, md)
}

func (bm *batchMetrics) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, any) {
func (bm *batchMetrics) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, any) {
var req pmetric.Metrics
var sent int
var bytes int
if sendBatchMaxSize > 0 && bm.dataPointCount > sendBatchMaxSize {
req = splitMetrics(sendBatchMaxSize, bm.metricData)
bm.dataPointCount -= sendBatchMaxSize
Expand All @@ -590,10 +637,7 @@ func (bm *batchMetrics) splitBatch(ctx context.Context, sendBatchMaxSize int, re
bm.dataPointCount = 0
}

if returnBytes {
bytes = bm.sizer.MetricsSize(req)
}
return sent, bytes, req
return sent, req
}

func (bm *batchMetrics) itemCount() int {
Expand Down Expand Up @@ -622,15 +666,19 @@ func newBatchLogs(nextConsumer consumer.Logs) *batchLogs {
return &batchLogs{nextConsumer: nextConsumer, logData: plog.NewLogs(), sizer: &plog.ProtoMarshaler{}}
}

func (bl *batchLogs) export(ctx context.Context, req any) error {
func (bl *batchLogs) sizeBytes(data any) int {
return bl.sizer.LogsSize(data.(plog.Logs))
}

func (bl *batchLogs) export(ctx context.Context, req any) (retErr error) {
defer recoverError(&retErr)
ld := req.(plog.Logs)
return bl.nextConsumer.ConsumeLogs(ctx, ld)
}

func (bl *batchLogs) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, any) {
func (bl *batchLogs) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, any) {
var req plog.Logs
var sent int
var bytes int

if sendBatchMaxSize > 0 && bl.logCount > sendBatchMaxSize {
req = splitLogs(sendBatchMaxSize, bl.logData)
Expand All @@ -642,10 +690,7 @@ func (bl *batchLogs) splitBatch(ctx context.Context, sendBatchMaxSize int, retur
bl.logData = plog.NewLogs()
bl.logCount = 0
}
if returnBytes {
bytes = bl.sizer.LogsSize(req)
}
return sent, bytes, req
return sent, req
}

func (bl *batchLogs) itemCount() int {
Expand All @@ -661,4 +706,4 @@ func (bl *batchLogs) add(item any) {
}
bl.logCount += newLogsCount
ld.ResourceLogs().MoveAndAppendTo(bl.logData.ResourceLogs())
}
}
Loading