Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(otelarrowreceiver): asynchronous stream operations #181

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9a73c2a
add semaphore files
moh-osman3 Apr 9, 2024
e30c220
add uncompSz to proto defintion and apply semaphore
moh-osman3 Apr 14, 2024
3ec41e4
add AdmissionLimitMiB separate from MemoryLimitMiB, always record unc…
moh-osman3 Apr 16, 2024
8e16255
closer to working
moh-osman3 Apr 18, 2024
2a3fa37
return error in goroutine
moh-osman3 Apr 23, 2024
45f2f9a
refactor solve race, broken tests still
moh-osman3 Apr 23, 2024
3e68d50
almost
moh-osman3 Apr 24, 2024
212fca9
painful but fixed?
moh-osman3 Apr 25, 2024
0d9f180
rm
moh-osman3 Apr 25, 2024
366936e
rebase and fix tests again
moh-osman3 Apr 25, 2024
4c84297
fix race
moh-osman3 Apr 25, 2024
e1b920a
remove unneded package
moh-osman3 Apr 25, 2024
f93a37c
remove replace
moh-osman3 Apr 25, 2024
77364fe
review feedback
moh-osman3 Apr 26, 2024
a8448e4
remove unneeded arg
moh-osman3 Apr 26, 2024
12f1b8c
rm uncompressed_size from proto
moh-osman3 May 1, 2024
6f8ba19
add header, acquire diff
moh-osman3 May 1, 2024
af29366
fix potential deadlock
moh-osman3 May 2, 2024
3b09b6c
unit tests
moh-osman3 May 3, 2024
1258605
use assertEqualUnsortedSpans
moh-osman3 May 3, 2024
25366e0
remove unneeded helper functions and use otelAssert.Equiv
moh-osman3 May 3, 2024
40f7fa9
update changelog
moh-osman3 May 3, 2024
7db9a40
improve readability with response.bytesToRelease
moh-osman3 May 7, 2024
8d45b90
remove deprecated
moh-osman3 May 7, 2024
8ea4949
gofmt
moh-osman3 May 7, 2024
aacba4e
rm newline
moh-osman3 May 7, 2024
b97b100
go mod tidy
moh-osman3 May 7, 2024
e9e2d31
add client address if available
moh-osman3 May 9, 2024
985ab4b
Merge branch 'main' into mohosman/apply-bounded-semaphore
jmacd May 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 116 additions & 104 deletions api/experimental/arrow/v1/arrow_service.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion api/experimental/arrow/v1/arrow_service_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion collector/examples/bridge/edge-collector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@ service:
address: 127.0.0.1:8888
level: detailed
logs:
level: info
level: info
2 changes: 1 addition & 1 deletion collector/examples/bridge/saas-collector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ service:
telemetry:
metrics:
address: 127.0.0.1:8889
level: normal
level: normal
2 changes: 1 addition & 1 deletion collector/exporter/otelarrowexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ require (
go.opentelemetry.io/collector/config/configgrpc v0.98.0
go.opentelemetry.io/collector/config/configopaque v1.5.0
go.opentelemetry.io/collector/config/configretry v0.98.0
go.opentelemetry.io/collector/config/configtelemetry v0.98.0
go.opentelemetry.io/collector/config/configtls v0.98.0
go.opentelemetry.io/collector/confmap v0.98.0
go.opentelemetry.io/collector/consumer v0.98.0
Expand Down Expand Up @@ -75,6 +74,7 @@ require (
github.com/x448/float16 v0.8.4 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opentelemetry.io/collector/config/confignet v0.98.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.98.0 // indirect
go.opentelemetry.io/collector/config/internal v0.98.0 // indirect
go.opentelemetry.io/collector/featuregate v1.5.0 // indirect
go.opentelemetry.io/collector/receiver v0.98.0 // indirect
Expand Down
23 changes: 10 additions & 13 deletions collector/exporter/otelarrowexporter/internal/arrow/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/open-telemetry/otel-arrow/collector/netstats"
arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -285,18 +284,16 @@ func (e *Exporter) SendAndWait(ctx context.Context, data any) (bool, error) {
// exporter, because of the optimization phase performed in the
// conversion to Arrow.
var uncompSize int
if e.telemetry.MetricsLevel > configtelemetry.LevelNormal {
switch data := data.(type) {
case ptrace.Traces:
var sizer ptrace.ProtoMarshaler
uncompSize = sizer.TracesSize(data)
case plog.Logs:
var sizer plog.ProtoMarshaler
uncompSize = sizer.LogsSize(data)
case pmetric.Metrics:
var sizer pmetric.ProtoMarshaler
uncompSize = sizer.MetricsSize(data)
}
switch data := data.(type) {
case ptrace.Traces:
var sizer ptrace.ProtoMarshaler
uncompSize = sizer.TracesSize(data)
case plog.Logs:
var sizer plog.ProtoMarshaler
uncompSize = sizer.LogsSize(data)
case pmetric.Metrics:
var sizer pmetric.ProtoMarshaler
uncompSize = sizer.MetricsSize(data)
}

wri := writeItem{
Expand Down
3 changes: 3 additions & 0 deletions collector/exporter/otelarrowexporter/internal/arrow/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hp
wri.errCh <- consumererror.NewPermanent(err)
return err
}
fmt.Println("EXPORTER SIDE UNCOMP SIZE")
fmt.Println(wri.uncompSize)
batch.UncompressedSize = uint64(wri.uncompSize)

// Optionally include outgoing metadata, if present.
if len(wri.md) != 0 {
Expand Down
7 changes: 7 additions & 0 deletions collector/receiver/otelarrowreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ type ArrowConfig struct {
// passing through, they will see ResourceExhausted errors.
MemoryLimitMiB uint64 `mapstructure:"memory_limit_mib"`

AdmissionLimitMiB uint64 `mapstructure:"admission_limit_mib"`

// WaiterLimit is the limit on the number of waiters waiting to be processed and consumed.
// This is a dimension of memory limiting to ensure waiters are not consuming an
// unexpectedly large amount of memory in the arrow receiver.
WaiterLimit int64 `mapstructure:"waiter_limit"`

// Zstd settings apply to OTel-Arrow use of gRPC specifically.
Zstd zstd.DecoderConfig `mapstructure:"zstd"`
}
Expand Down
1 change: 1 addition & 0 deletions collector/receiver/otelarrowreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func createDefaultConfig() component.Config {
},
Arrow: ArrowConfig{
MemoryLimitMiB: defaultMemoryLimitMiB,
// AdmissionLimitMiB: defaultMemoryLimitMiB/2,
},
},
}
Expand Down
4 changes: 4 additions & 0 deletions collector/receiver/otelarrowreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ require (
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/apache/arrow/go/v14 v14.0.2 // indirect
github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect
Expand All @@ -55,6 +57,7 @@ require (
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.1.1 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand All @@ -66,6 +69,7 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opentelemetry.io/collector/config/configcompression v1.5.0 // indirect
Expand Down
1 change: 0 additions & 1 deletion collector/receiver/otelarrowreceiver/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA
github.com/open-telemetry/otel-arrow v0.22.0 h1:G1jgtqAM2ho5pyKQ4tyrDzk9Y0VcJ+GZQRJgN26vRlI=
github.com/open-telemetry/otel-arrow v0.22.0/go.mod h1:F50XFaiNfkfB0MYftZIUKFULm6pxfGqjbgQzevi+65M=
github.com/open-telemetry/otel-arrow/collector v0.22.0 h1:lHFjzkh5PbsiW8B63SRntnP9W7bLCXV9lslO4zI0s/Y=
github.com/open-telemetry/otel-arrow/collector v0.22.0/go.mod h1:R7hRwuGDxoGLB27dkJUFKDK7mGG7Yb02ODnLHx8Whis=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
Loading
Loading