Skip to content

Commit

Permalink
(otelarrowreceiver): asynchronous stream operations (#181)
Browse files Browse the repository at this point in the history
This PR
- resolves #182
- applies an request admission memory limiter merged in
#174

---------

Co-authored-by: Joshua MacDonald <[email protected]>
  • Loading branch information
moh-osman3 and jmacd authored May 9, 2024
1 parent dc5112b commit 62b0393
Show file tree
Hide file tree
Showing 17 changed files with 473 additions and 115 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## Unreleased

- Added a semaphore package to limit bytes admitted and total number of waiters. [#174](https://github.com/open-telemetry/otel-arrow/pull/174)
- Refactor otelarrowreceiver to do stream.Recv, request processing, and stream.Send in separate goroutines. [#181](https://github.com/open-telemetry/otel-arrow/pull/181)

- Add a semaphore package to limit bytes admitted and total number of waiters. [#174](https://github.com/open-telemetry/otel-arrow/pull/174)

## [0.22.0](https://github.com/open-telemetry/otel-arrow/releases/tag/v0.22.0) - 2024-04-16

Expand Down
4 changes: 2 additions & 2 deletions api/experimental/arrow/v1/arrow_service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/experimental/arrow/v1/arrow_service_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion collector/examples/bridge/edge-collector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@ service:
address: 127.0.0.1:8888
level: detailed
logs:
level: info
level: info
2 changes: 1 addition & 1 deletion collector/examples/bridge/saas-collector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ service:
telemetry:
metrics:
address: 127.0.0.1:8889
level: normal
level: normal
2 changes: 1 addition & 1 deletion collector/exporter/otelarrowexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ require (
go.opentelemetry.io/collector/config/configgrpc v0.98.0
go.opentelemetry.io/collector/config/configopaque v1.5.0
go.opentelemetry.io/collector/config/configretry v0.98.0
go.opentelemetry.io/collector/config/configtelemetry v0.98.0
go.opentelemetry.io/collector/config/configtls v0.98.0
go.opentelemetry.io/collector/confmap v0.98.0
go.opentelemetry.io/collector/consumer v0.98.0
Expand Down Expand Up @@ -75,6 +74,7 @@ require (
github.com/x448/float16 v0.8.4 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opentelemetry.io/collector/config/confignet v0.98.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.98.0 // indirect
go.opentelemetry.io/collector/config/internal v0.98.0 // indirect
go.opentelemetry.io/collector/featuregate v1.5.0 // indirect
go.opentelemetry.io/collector/receiver v0.98.0 // indirect
Expand Down
29 changes: 16 additions & 13 deletions collector/exporter/otelarrowexporter/internal/arrow/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (
"context"
"errors"
"math/rand"
"strconv"
"sync"
"time"

arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1"
"github.com/open-telemetry/otel-arrow/collector/netstats"
arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -285,19 +285,22 @@ func (e *Exporter) SendAndWait(ctx context.Context, data any) (bool, error) {
// exporter, because of the optimization phase performed in the
// conversion to Arrow.
var uncompSize int
if e.telemetry.MetricsLevel > configtelemetry.LevelNormal {
switch data := data.(type) {
case ptrace.Traces:
var sizer ptrace.ProtoMarshaler
uncompSize = sizer.TracesSize(data)
case plog.Logs:
var sizer plog.ProtoMarshaler
uncompSize = sizer.LogsSize(data)
case pmetric.Metrics:
var sizer pmetric.ProtoMarshaler
uncompSize = sizer.MetricsSize(data)
}
switch data := data.(type) {
case ptrace.Traces:
var sizer ptrace.ProtoMarshaler
uncompSize = sizer.TracesSize(data)
case plog.Logs:
var sizer plog.ProtoMarshaler
uncompSize = sizer.LogsSize(data)
case pmetric.Metrics:
var sizer pmetric.ProtoMarshaler
uncompSize = sizer.MetricsSize(data)
}

if md == nil {
md = make(map[string]string)
}
md["otlp-pdata-size"] = strconv.Itoa(uncompSize)

wri := writeItem{
records: data,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,12 +602,15 @@ func TestArrowExporterHeaders(t *testing.T) {

if times%2 == 1 {
md := metadata.MD{
"expected1": []string{"metadata1"},
"expected2": []string{fmt.Sprint(times)},
"expected1": []string{"metadata1"},
"expected2": []string{fmt.Sprint(times)},
"otlp-pdata-size": []string{"329"},
}
expectOutput = append(expectOutput, md)
} else {
expectOutput = append(expectOutput, nil)
expectOutput = append(expectOutput, metadata.MD{
"otlp-pdata-size": []string{"329"},
})
}

sent, err := tc.exporter.SendAndWait(ctx, input)
Expand Down Expand Up @@ -677,11 +680,14 @@ func TestArrowExporterIsTraced(t *testing.T) {
propagation.TraceContext{}.Inject(ctx, propagation.MapCarrier(expectMap))

md := metadata.MD{
"traceparent": []string{expectMap["traceparent"]},
"traceparent": []string{expectMap["traceparent"]},
"otlp-pdata-size": []string{"329"},
}
expectOutput = append(expectOutput, md)
} else {
expectOutput = append(expectOutput, nil)
expectOutput = append(expectOutput, metadata.MD{
"otlp-pdata-size": []string{"329"},
})
}

sent, err := tc.exporter.SendAndWait(ctx, input)
Expand Down
7 changes: 7 additions & 0 deletions collector/receiver/otelarrowreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ type ArrowConfig struct {
// passing through, they will see ResourceExhausted errors.
MemoryLimitMiB uint64 `mapstructure:"memory_limit_mib"`

AdmissionLimitMiB uint64 `mapstructure:"admission_limit_mib"`

// WaiterLimit is the limit on the number of waiters waiting to be processed and consumed.
// This is a dimension of memory limiting to ensure waiters are not consuming an
// unexpectedly large amount of memory in the arrow receiver.
WaiterLimit int64 `mapstructure:"waiter_limit"`

// Zstd settings apply to OTel-Arrow use of gRPC specifically.
Zstd zstd.DecoderConfig `mapstructure:"zstd"`
}
Expand Down
6 changes: 4 additions & 2 deletions collector/receiver/otelarrowreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func TestUnmarshalConfig(t *testing.T) {
},
},
Arrow: ArrowConfig{
MemoryLimitMiB: 123,
MemoryLimitMiB: 123,
AdmissionLimitMiB: 80,
},
},
}, cfg)
Expand All @@ -101,7 +102,8 @@ func TestUnmarshalConfigUnix(t *testing.T) {
ReadBufferSize: 512 * 1024,
},
Arrow: ArrowConfig{
MemoryLimitMiB: defaultMemoryLimitMiB,
MemoryLimitMiB: defaultMemoryLimitMiB,
AdmissionLimitMiB: defaultAdmissionLimitMiB,
},
},
}, cfg)
Expand Down
2 changes: 2 additions & 0 deletions collector/receiver/otelarrowreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
defaultGRPCEndpoint = "0.0.0.0:4317"

defaultMemoryLimitMiB = 128
defaultAdmissionLimitMiB = defaultMemoryLimitMiB / 2
)

// NewFactory creates a new OTLP receiver factory.
Expand All @@ -46,6 +47,7 @@ func createDefaultConfig() component.Config {
},
Arrow: ArrowConfig{
MemoryLimitMiB: defaultMemoryLimitMiB,
AdmissionLimitMiB: defaultAdmissionLimitMiB,
},
},
}
Expand Down
6 changes: 5 additions & 1 deletion collector/receiver/otelarrowreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ require (
go.uber.org/zap v1.27.0
golang.org/x/net v0.24.0
google.golang.org/grpc v1.63.2
google.golang.org/protobuf v1.33.0
)

require (
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/apache/arrow/go/v14 v14.0.2 // indirect
github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect
Expand All @@ -55,6 +58,7 @@ require (
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.1.1 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand All @@ -66,6 +70,7 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opentelemetry.io/collector/config/configcompression v1.5.0 // indirect
Expand All @@ -84,6 +89,5 @@ require (
golang.org/x/tools v0.15.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
11 changes: 9 additions & 2 deletions collector/receiver/otelarrowreceiver/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ github.com/apache/arrow/go/v14 v14.0.2 h1:N8OkaJEOfI3mEZt07BIkvo4sC6XDbL+48MBPWO
github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY=
github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc h1:Keo7wQ7UODUaHcEi7ltENhbAK2VgZjfat6mLy03tQzo=
github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc/go.mod h1:k08r+Yj1PRAmuayFiRK6MYuR5Ve4IuZtTfxErMIh0+c=
github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk=
github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/brianvoe/gofakeit/v6 v6.17.0 h1:obbQTJeHfktJtiZzq0Q1bEpsNUs+yHrYlPVWt7BtmJ4=
github.com/brianvoe/gofakeit/v6 v6.17.0/go.mod h1:Ow6qC71xtwm79anlwKRlWZW6zVq9D2XHE4QSSMP/rU8=
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
Expand Down Expand Up @@ -49,6 +53,7 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek=
github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
Expand All @@ -70,6 +75,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw=
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
Expand All @@ -84,8 +91,6 @@ github.com/mostynb/go-grpc-compression v1.2.2/go.mod h1:GOCr2KBxXcblCuczg3YdLQlc
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/open-telemetry/otel-arrow v0.22.0 h1:G1jgtqAM2ho5pyKQ4tyrDzk9Y0VcJ+GZQRJgN26vRlI=
github.com/open-telemetry/otel-arrow v0.22.0/go.mod h1:F50XFaiNfkfB0MYftZIUKFULm6pxfGqjbgQzevi+65M=
github.com/open-telemetry/otel-arrow/collector v0.22.0 h1:lHFjzkh5PbsiW8B63SRntnP9W7bLCXV9lslO4zI0s/Y=
github.com/open-telemetry/otel-arrow/collector v0.22.0/go.mod h1:R7hRwuGDxoGLB27dkJUFKDK7mGG7Yb02ODnLHx8Whis=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -105,6 +110,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc=
github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
Loading

0 comments on commit 62b0393

Please sign in to comment.