Skip to content

Commit

Permalink
New e2e test; use uint64 for consistency w/ config
Browse files Browse the repository at this point in the history
  • Loading branch information
jmacd committed Oct 28, 2024
1 parent 845b138 commit e9b115a
Show file tree
Hide file tree
Showing 14 changed files with 222 additions and 222 deletions.
26 changes: 13 additions & 13 deletions internal/otelarrow/admission/boundedqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ var ErrRequestTooLarge = status.Errorf(grpccodes.InvalidArgument, "rejecting req

// BoundedQueue is a LIFO-oriented admission-controlled Queue.
type BoundedQueue struct {
maxLimitAdmit int64
maxLimitWait int64
maxLimitAdmit uint64
maxLimitWait uint64
tracer trace.Tracer

// lock protects currentAdmitted, currentWaiting, and waiters

lock sync.Mutex
currentAdmitted int64
currentWaiting int64
currentAdmitted uint64
currentWaiting uint64
waiters *list.List // of *waiter
}

Expand All @@ -37,13 +37,13 @@ var _ Queue = &BoundedQueue{}
// waiter is an item in the BoundedQueue waiters list.
type waiter struct {
notify N
pending int64
pending uint64
}

// NewBoundedQueue returns a LIFO-oriented Queue implementation which
// admits `maxLimitAdmit` bytes concurrently and allows up to
// `maxLimitWait` bytes to wait for admission.
func NewBoundedQueue(ts component.TelemetrySettings, maxLimitAdmit, maxLimitWait int64) Queue {
func NewBoundedQueue(ts component.TelemetrySettings, maxLimitAdmit, maxLimitWait uint64) Queue {
return &BoundedQueue{
maxLimitAdmit: maxLimitAdmit,
maxLimitWait: maxLimitWait,
Expand All @@ -58,7 +58,7 @@ func NewBoundedQueue(ts component.TelemetrySettings, maxLimitAdmit, maxLimitWait
// - element=nil, error=nil: the fast success path
// - element=nil, error=non-nil: the fast failure path
// - element=non-nil, error=non-nil: the slow success path
func (bq *BoundedQueue) acquireOrGetWaiter(pending int64) (*list.Element, error) {
func (bq *BoundedQueue) acquireOrGetWaiter(pending uint64) (*list.Element, error) {
bq.lock.Lock()
defer bq.lock.Unlock()

Expand All @@ -84,10 +84,10 @@ func (bq *BoundedQueue) acquireOrGetWaiter(pending int64) (*list.Element, error)
}

// Acquire implements Queue.
func (bq *BoundedQueue) Acquire(ctx context.Context, pending int64) (ReleaseFunc, error) {
func (bq *BoundedQueue) Acquire(ctx context.Context, pending uint64) (ReleaseFunc, error) {
element, err := bq.acquireOrGetWaiter(pending)
parentSpan := trace.SpanFromContext(ctx)
pendingAttr := trace.WithAttributes(attribute.Int64("pending", pending))
pendingAttr := trace.WithAttributes(attribute.Int64("pending", int64(pending)))

if err != nil {
parentSpan.AddEvent("admission rejected (fast path)", pendingAttr)
Expand Down Expand Up @@ -149,25 +149,25 @@ func (bq *BoundedQueue) admitWaitersLocked() {
}
}

func (bq *BoundedQueue) addWaiterLocked(pending int64) *list.Element {
func (bq *BoundedQueue) addWaiterLocked(pending uint64) *list.Element {
bq.currentWaiting += pending
return bq.waiters.PushBack(&waiter{
pending: pending,
notify: newNotification(),
})
}

func (bq *BoundedQueue) removeWaiterLocked(pending int64, element *list.Element) {
func (bq *BoundedQueue) removeWaiterLocked(pending uint64, element *list.Element) {
bq.currentWaiting -= pending
bq.waiters.Remove(element)
}

func (bq *BoundedQueue) releaseLocked(pending int64) {
func (bq *BoundedQueue) releaseLocked(pending uint64) {
bq.currentAdmitted -= pending
bq.admitWaitersLocked()
}

func (bq *BoundedQueue) releaseFunc(pending int64) ReleaseFunc {
func (bq *BoundedQueue) releaseFunc(pending uint64) ReleaseFunc {
return func() {
bq.lock.Lock()
defer bq.lock.Unlock()
Expand Down
22 changes: 11 additions & 11 deletions internal/otelarrow/admission/boundedqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ type bqTest struct {

var noopTelemetry = componenttest.NewNopTelemetrySettings()

func newBQTest(maxAdmit, maxWait int64) bqTest {
func newBQTest(maxAdmit, maxWait uint64) bqTest {
return bqTest{
BoundedQueue: NewBoundedQueue(noopTelemetry, maxAdmit, maxWait).(*BoundedQueue),
}
}

func (bq *bqTest) startWaiter(t *testing.T, ctx context.Context, size int64, relp *ReleaseFunc) N {
func (bq *bqTest) startWaiter(t *testing.T, ctx context.Context, size uint64, relp *ReleaseFunc) N {
n := newNotification()
go func() {
var err error
Expand All @@ -41,7 +41,7 @@ func (bq *bqTest) startWaiter(t *testing.T, ctx context.Context, size int64, rel
return n
}

func (bq *bqTest) waitForPending(t *testing.T, admitted, waiting int64) {
func (bq *bqTest) waitForPending(t *testing.T, admitted, waiting uint64) {
require.Eventually(t, func() bool {
bq.lock.Lock()
defer bq.lock.Unlock()
Expand All @@ -56,26 +56,26 @@ func abs(x int64) int64 {
return x
}

func mkRepeat(x int64, n int) []int64 {
func mkRepeat(x uint64, n int) []uint64 {
if n == 0 {
return nil
}
return append(mkRepeat(x, n-1), x)
}

func mkRange(from, to int64) []int64 {
func mkRange(from, to uint64) []uint64 {
if from > to {
return nil
}
return append([]int64{from}, mkRange(from+1, to)...)
return append([]uint64{from}, mkRange(from+1, to)...)
}

func TestBoundedQueueLimits(t *testing.T) {
for _, test := range []struct {
name string
maxLimitAdmit int64
maxLimitWait int64
requestSizes []int64
maxLimitAdmit uint64
maxLimitWait uint64
requestSizes []uint64
timeout time.Duration
expectErrs map[string]int
}{
Expand Down Expand Up @@ -133,7 +133,7 @@ func TestBoundedQueueLimits(t *testing.T) {
name: "with_size_exceeded",
maxLimitAdmit: 1000,
maxLimitWait: 2000,
requestSizes: []int64{1001},
requestSizes: []uint64{1001},
timeout: 0,
expectErrs: map[string]int{
ErrRequestTooLarge.Error(): 1,
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestBoundedQueueLimits(t *testing.T) {
require.Equal(t, test.expectErrs, errCounts)

// Make sure we can allocate the whole limit at end-of-test.
release, err := bq.Acquire(ctx, int64(test.maxLimitAdmit))
release, err := bq.Acquire(ctx, test.maxLimitAdmit)
assert.NoError(t, err)
release()

Expand Down
4 changes: 2 additions & 2 deletions internal/otelarrow/admission/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Queue interface {
//
// In case (3), the return value will be a ResourceExhausted
// error.
Acquire(ctx context.Context, weight int64) (ReleaseFunc, error)
Acquire(ctx context.Context, weight uint64) (ReleaseFunc, error)
}

// ReleaseFunc is returned by Acquire when the Acquire() was admitted.
Expand All @@ -48,6 +48,6 @@ func NewUnboundedQueue() Queue {
func noopRelease() {}

// Acquire implements Queue.
func (noopController) Acquire(ctx context.Context, weight int64) (ReleaseFunc, error) {
func (noopController) Acquire(ctx context.Context, weight uint64) (ReleaseFunc, error) {
return noopRelease, nil
}
124 changes: 113 additions & 11 deletions internal/otelarrow/test/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ import (
)

type testParams struct {
threadCount int
requestUntil func(*testConsumer) bool
threadCount int
requestWhileTrue func(*testConsumer) bool

// missingDeadline is configured so the zero value implies a deadline,
// which is the default.
Expand Down Expand Up @@ -107,7 +107,7 @@ func (tc *testConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) err
func testLoggerSettings(_ *testing.T) (component.TelemetrySettings, *observer.ObservedLogs, *tracetest.InMemoryExporter) {
tset := componenttest.NewNopTelemetrySettings()

core, obslogs := observer.New(zapcore.InfoLevel)
core, obslogs := observer.New(zapcore.DebugLevel)

exp := tracetest.NewInMemoryExporter()

Expand Down Expand Up @@ -228,7 +228,7 @@ func testIntegrationTraces(ctx context.Context, t *testing.T, tp testParams, cfg
go func(num int) {
defer clientDoneWG.Done()
generator := mkgen()
for i := 0; tp.requestUntil(testCon); i++ {
for i := 0; tp.requestWhileTrue(testCon); i++ {
td := generator(i)

errf(t, exporter.ConsumeTraces(ctx, td))
Expand Down Expand Up @@ -395,18 +395,43 @@ func failureMemoryLimitEnding(t *testing.T, _ testParams, testCon *testConsumer,
eSigs, eMsgs := logSigs(testCon.expLogs)
rSigs, rMsgs := logSigs(testCon.recvLogs)

// Test for arrow receiver stream errors on both sides.
// Test for arrow stream errors on both sides.
require.Positive(t, eSigs["arrow stream error|||code///message///where"], "should have exporter arrow stream errors: %v", eMsgs)
require.Positive(t, rSigs["arrow stream error|||code///message///where"], "should have receiver arrow stream errors: %v", rSigs)

// Ensure both side's error logs include memory limit errors
// one way or another.
// Ensure both side's error logs include admission control errors.
require.Positive(t, countMemoryLimitErrors(rMsgs), "should have memory limit errors: %v", rMsgs)
require.Positive(t, countMemoryLimitErrors(eMsgs), "should have memory limit errors: %v", eMsgs)

return nil, nil
}

var admissionRegexp = regexp.MustCompile(`too much pending data`)

func countAdmissionLimitErrors(msgs []string) (cnt int) {
for _, msg := range msgs {
if admissionRegexp.MatchString(msg) {
cnt++
}
}
return
}

func failureAdmissionLimitEnding(t *testing.T, _ testParams, testCon *testConsumer, _ [][]ptrace.Traces) (rops, eops map[string]int) {
eSigs, eMsgs := logSigs(testCon.expLogs)
rSigs, rMsgs := logSigs(testCon.recvLogs)

// Test for arrow stream errors on both sides.
require.Positive(t, eSigs["arrow stream error|||code///message///where"], "should have exporter arrow stream errors: %v", eMsgs)
require.Positive(t, rSigs["arrow stream error|||code///message///where"], "should have receiver arrow stream errors: %v", rSigs)

// Ensure both side's error logs include admission limit errors.
require.Positive(t, countAdmissionLimitErrors(rMsgs), "should have admission limit errors: %v", rMsgs)
require.Positive(t, countAdmissionLimitErrors(eMsgs), "should have admission limit errors: %v", eMsgs)

return nil, nil
}

func consumerSuccess(t *testing.T, err error) {
require.NoError(t, err)
}
Expand Down Expand Up @@ -441,7 +466,7 @@ func TestIntegrationTracesSimple(t *testing.T) {
// until 10 threads can write 1000 spans
var params = testParams{
threadCount: 10,
requestUntil: func(test *testConsumer) bool {
requestWhileTrue: func(test *testConsumer) bool {
return test.sink.SpanCount() < 1000
},
}
Expand All @@ -462,7 +487,7 @@ func TestIntegrationDeadlinePropagation(t *testing.T) {
// Until at least one span is written.
var params = testParams{
threadCount: 1,
requestUntil: func(test *testConsumer) bool {
requestWhileTrue: func(test *testConsumer) bool {
return test.sink.SpanCount() < 1
},
missingDeadline: !hasDeadline,
Expand All @@ -487,7 +512,7 @@ func TestIntegrationMemoryLimited(t *testing.T) {
// until exporter and receiver finish at least one ArrowTraces span.
params := testParams{
threadCount: 10,
requestUntil: func(test *testConsumer) bool {
requestWhileTrue: func(test *testConsumer) bool {
cf := func(spans tracetest.SpanStubs) (cnt int) {
for _, span := range spans {
if span.Name == "opentelemetry.proto.experimental.arrow.v1.ArrowTracesService/ArrowTraces" {
Expand Down Expand Up @@ -575,7 +600,7 @@ func TestIntegrationSelfTracing(t *testing.T) {
// until 2 Arrow stream spans are received from self instrumentation
var params = testParams{
threadCount: 10,
requestUntil: func(test *testConsumer) bool {
requestWhileTrue: func(test *testConsumer) bool {

cnt := 0
for _, span := range test.expSpans.GetSpans() {
Expand All @@ -596,3 +621,80 @@ func TestIntegrationSelfTracing(t *testing.T) {
}
}, func() GenFunc { return makeTestTraces }, consumerSuccess, multiStreamEnding)
}

func nearLimitGenFunc() MkGen {
var sizer ptrace.ProtoMarshaler
const nearLimit = 900 << 10 // close to 1 MiB
const hardLimit = 1 << 20 // 1 MiB

return func() GenFunc {
entropy := datagen.NewTestEntropy(int64(rand.Uint64())) //nolint:gosec // only used for testing

tracesGen := datagen.NewTracesGenerator(
entropy,
entropy.NewStandardResourceAttributes(),
entropy.NewStandardInstrumentationScopes(),
)

return func(x int) ptrace.Traces {
size := 100
for {
td := tracesGen.Generate(size, time.Minute)
uncomp := sizer.TracesSize(td)
if uncomp > nearLimit && uncomp < hardLimit {
return td
}
if uncomp > hardLimit {
size -= 10
} else if uncomp < nearLimit/2 {
size *= 2
} else {
size += 10
}
}

}
}
}

func TestIntegrationAdmissionLimited(t *testing.T) {
for _, allowWait := range []bool{false, true} {
t.Run(fmt.Sprint("allow_wait=", allowWait), func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// until exporter and receiver finish at least one ArrowTraces span.
params := testParams{
threadCount: 10,
requestWhileTrue: func(test *testConsumer) bool {
return test.sink.SpanCount() < 10000
},
}

var ending func(*testing.T, testParams, *testConsumer, [][]ptrace.Traces) (_, _ map[string]int)
var waitingLimit uint64
if allowWait {
ending = standardEnding
waitingLimit = uint64(params.threadCount)
} else {
ending = failureAdmissionLimitEnding
waitingLimit = 0
}

testIntegrationTraces(ctx, t, params, func(ecfg *ExpConfig, rcfg *RecvConfig) {
rcfg.Admission.RequestLimitMiB = 1
// either allow, or don't allow, all threads to wait
rcfg.Admission.WaitingLimitMiB = waitingLimit
ecfg.Arrow.NumStreams = 10

// Shorten timeouts for this test, because we intend
// for it to fail and don't want to wait for retries.
ecfg.TimeoutSettings.Timeout = 5 * time.Second
ecfg.RetryConfig.InitialInterval = 1 * time.Second
ecfg.RetryConfig.MaxInterval = 2 * time.Second
ecfg.RetryConfig.MaxElapsedTime = 10 * time.Second
ecfg.Arrow.MaxStreamLifetime = 5 * time.Second
}, nearLimitGenFunc(), consumerFailure, ending)
})
}
}
Loading

0 comments on commit e9b115a

Please sign in to comment.