Skip to content

Commit

Permalink
Merge branch 'main' into release_05_09_24
Browse files Browse the repository at this point in the history
  • Loading branch information
jmacd authored May 9, 2024
2 parents 4d2b476 + f8c64ed commit 1533c8e
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 34 deletions.
24 changes: 12 additions & 12 deletions collector/exporter/otelarrowexporter/internal/arrow/bestofn.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ type bestOfNPrioritizer struct {
// state tracks the work being handled by all streams.
state []*streamWorkState

// N is the number of streams to consder in each decision.
N int
// numChoices is the number of streams to consder in each decision.
numChoices int

// loadFunc is the load function.
loadFunc loadFunc
Expand All @@ -42,11 +42,11 @@ type streamSorter struct {

var _ streamPrioritizer = &bestOfNPrioritizer{}

func newBestOfNPrioritizer(dc doneCancel, N, numStreams int, lf loadFunc) (*bestOfNPrioritizer, []*streamWorkState) {
func newBestOfNPrioritizer(dc doneCancel, numChoices, numStreams int, lf loadFunc) (*bestOfNPrioritizer, []*streamWorkState) {
var state []*streamWorkState

// Limit N to the number of streams.
N = min(numStreams, N)
// Limit numChoices to the number of streams.
numChoices = min(numStreams, numChoices)

for i := 0; i < numStreams; i++ {
ws := &streamWorkState{
Expand All @@ -61,7 +61,7 @@ func newBestOfNPrioritizer(dc doneCancel, N, numStreams int, lf loadFunc) (*best
doneCancel: dc,
input: make(chan writeItem, runtime.NumCPU()),
state: state,
N: N,
numChoices: numChoices,
loadFunc: lf,
}

Expand Down Expand Up @@ -118,7 +118,7 @@ func (lp *bestOfNPrioritizer) sendAndWait(ctx context.Context, errCh <-chan erro
}
}

func (lp *bestOfNPrioritizer) nextWriter(ctx context.Context) streamWriter {
func (lp *bestOfNPrioritizer) nextWriter() streamWriter {
select {
case <-lp.done:
// In case of downgrade, return nil to return into a
Expand All @@ -135,17 +135,17 @@ func (lp *bestOfNPrioritizer) streamFor(_ writeItem, rnd *rand.Rand, tmp []strea
for idx, item := range lp.state {
tmp[idx].work = item
}
// Select N at random by shifting the selection into the start
// Select numChoices at random by shifting the selection into the start
// of the temporary slice.
for i := 0; i < lp.N; i++ {
pick := rand.Intn(lp.N - i)
for i := 0; i < lp.numChoices; i++ {
pick := rnd.Intn(lp.numChoices - i)
tmp[i], tmp[i+pick] = tmp[i+pick], tmp[i]
}
for i := 0; i < lp.N; i++ {
for i := 0; i < lp.numChoices; i++ {
// TODO: skip channels w/ a pending item (maybe)
tmp[i].load = lp.loadFunc(tmp[i].work)
}
sort.Slice(tmp[0:lp.N], func(i, j int) bool {
sort.Slice(tmp[0:lp.numChoices], func(i, j int) bool {
return tmp[i].load < tmp[j].load
})
return tmp[0].work
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (e *Exporter) SendAndWait(ctx context.Context, data any) (bool, error) {
}

for {
writer := e.ready.nextWriter(ctx)
writer := e.ready.nextWriter()

if writer == nil {
return false, nil // a downgraded connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,6 @@ func statusOKFor(id int64) *arrowpb.BatchStatus {
}
}

func statusCanceledFor(id int64) *arrowpb.BatchStatus {
return &arrowpb.BatchStatus{
BatchId: id,
StatusCode: arrowpb.StatusCode_CANCELED,
}
}

func statusUnavailableFor(id int64) *arrowpb.BatchStatus {
return &arrowpb.BatchStatus{
BatchId: id,
Expand Down Expand Up @@ -725,6 +718,8 @@ func TestArrowExporterStreamLifetimeAndShutdown(t *testing.T) {
for _, numStreams := range []int{1, 2, 8} {
t.Run(fmt.Sprint(numStreams), func(t *testing.T) {
tc := newShortLifetimeStreamTestCase(t, pname, numStreams)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var wg sync.WaitGroup

Expand Down Expand Up @@ -757,14 +752,12 @@ func TestArrowExporterStreamLifetimeAndShutdown(t *testing.T) {
return tc.returnNewStream(channel)(ctx, opts...)
})

bg := context.Background()
require.NoError(t, tc.exporter.Start(bg))
require.NoError(t, tc.exporter.Start(ctx))

start := time.Now()
// This is 10 stream lifetimes using the "ShortLifetime" test.
for time.Since(start) < 5*time.Second {
input := testdata.GenerateTraces(2)
ctx := context.Background()

sent, err := tc.exporter.SendAndWait(ctx, input)
require.NoError(t, err)
Expand All @@ -773,10 +766,11 @@ func TestArrowExporterStreamLifetimeAndShutdown(t *testing.T) {
expectCount++
}

require.NoError(t, tc.exporter.Shutdown(bg))
require.NoError(t, tc.exporter.Shutdown(ctx))

require.Equal(t, expectCount, actualCount)

cancel()
wg.Wait()

require.Empty(t, tc.observedLogs.All())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (
type streamPrioritizer interface {
// nextWriter gets the next stream writer. In case the exporter
// was downgraded, returns nil.
nextWriter(context.Context) streamWriter
nextWriter() streamWriter

// downgrade is called with the root context of the exporter,
// and may block indefinitely. this allows the prioritizer to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (s *Stream) run(ctx context.Context, dc doneCancel, streamClient StreamClie
// the caller waiting on its error channel.
func (s *Stream) write(ctx context.Context) (retErr error) {
// always close send()
defer s.client.CloseSend()
defer func() { _ = s.client.CloseSend() }()

// headers are encoding using hpack, reusing a buffer on each call.
var hdrsBuf bytes.Buffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,13 @@ func (tc *streamTestCase) connectTestStream(h testChannel) func(context.Context,

// get returns the stream via the prioritizer it is registered with.
func (tc *streamTestCase) mustGet() streamWriter {
stream := tc.prioritizer.nextWriter(context.Background())
stream := tc.prioritizer.nextWriter()
if stream == nil {
panic("unexpected nil stream")
}
return stream
}

func (tc *streamTestCase) get() streamWriter {
return tc.prioritizer.nextWriter(context.Background())
}

func (tc *streamTestCase) mustSendAndWait() error {
ctx := context.Background()
ch := make(chan error, 1)
Expand Down
2 changes: 1 addition & 1 deletion collector/receiver/otelarrowreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestUnmarshalConfigUnix(t *testing.T) {
GRPC: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: "/tmp/grpc_otlp.sock",
Transport: "unix",
Transport: confignet.TransportTypeUnix,
},
ReadBufferSize: 512 * 1024,
},
Expand Down
4 changes: 2 additions & 2 deletions collector/receiver/otelarrowreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
const (
defaultGRPCEndpoint = "0.0.0.0:4317"

defaultMemoryLimitMiB = 128
defaultMemoryLimitMiB = 128
defaultAdmissionLimitMiB = defaultMemoryLimitMiB / 2
)

Expand All @@ -46,7 +46,7 @@ func createDefaultConfig() component.Config {
ReadBufferSize: 512 * 1024,
},
Arrow: ArrowConfig{
MemoryLimitMiB: defaultMemoryLimitMiB,
MemoryLimitMiB: defaultMemoryLimitMiB,
AdmissionLimitMiB: defaultAdmissionLimitMiB,
},
},
Expand Down

0 comments on commit 1533c8e

Please sign in to comment.