Skip to content

Commit

Permalink
feat: Try sync.pool to manage events
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeGoldsmith committed Feb 10, 2025
1 parent f440a83 commit f54052c
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 30 deletions.
5 changes: 5 additions & 0 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,7 @@ func (i *InMemCollector) ProcessSpanImmediately(sp *types.Span) (processed bool,

if !keep {
i.Metrics.Increment("dropped_from_stress")
types.DisposeEvent(&sp.Event)
return true, false
}

Expand Down Expand Up @@ -938,6 +939,7 @@ func (i *InMemCollector) dealWithSentTrace(ctx context.Context, tr cache.TraceSe
return
}
i.Logger.Debug().WithField("trace_id", sp.TraceID).Logf("Dropping span because of previous decision to drop trace")
types.DisposeEvent(&sp.Event)
}

func mergeTraceAndSpanSampleRates(sp *types.Span, traceSampleRate uint, dryRunMode bool) {
Expand Down Expand Up @@ -991,6 +993,9 @@ func (i *InMemCollector) send(ctx context.Context, trace *types.Trace, td *Trace
if !td.Kept && !i.Config.GetIsDryRun() {
i.Metrics.Increment("trace_send_dropped")
i.Logger.Info().WithFields(logFields).Logf("Dropping trace because of sampling decision")
for _, sp := range trace.GetSpans() {
types.DisposeEvent(&sp.Event)
}
return
}

Expand Down
34 changes: 4 additions & 30 deletions route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,16 +437,7 @@ func (r *Router) requestToEvent(ctx context.Context, req *http.Request, reqBod [
return nil, err
}

return &types.Event{
Context: ctx,
APIHost: apiHost,
APIKey: apiKey,
Dataset: dataset,
Environment: environment,
SampleRate: uint(sampleRate),
Timestamp: eventTime,
Data: data,
}, nil
return types.NewEvent(ctx, apiHost, apiKey, dataset, environment, uint(sampleRate), eventTime, data), nil
}

func (r *Router) batch(w http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -497,16 +488,7 @@ func (r *Router) batch(w http.ResponseWriter, req *http.Request) {
userAgent := getUserAgentFromRequest(req)
batchedResponses := make([]*BatchResponse, 0, len(batchedEvents))
for _, bev := range batchedEvents {
ev := &types.Event{
Context: ctx,
APIHost: apiHost,
APIKey: apiKey,
Dataset: dataset,
Environment: environment,
SampleRate: bev.getSampleRate(),
Timestamp: bev.getEventTime(),
Data: bev.Data,
}
ev := types.NewEvent(ctx, apiHost, apiKey, dataset, environment, bev.getSampleRate(), bev.getEventTime(), bev.Data)

addIncomingUserAgent(ev, userAgent)
err = r.processEvent(ev, reqID)
Expand Down Expand Up @@ -551,16 +533,7 @@ func (router *Router) processOTLPRequest(

for _, batch := range batches {
for _, ev := range batch.Events {
event := &types.Event{
Context: ctx,
APIHost: apiHost,
APIKey: apiKey,
Dataset: batch.Dataset,
Environment: environment,
SampleRate: uint(ev.SampleRate),
Timestamp: ev.Timestamp,
Data: ev.Attributes,
}
event := types.NewEvent(ctx, apiHost, apiKey, batch.Dataset, environment, uint(ev.SampleRate), ev.Timestamp, ev.Attributes)
addIncomingUserAgent(event, incomingUserAgent)
if err = router.processEvent(event, requestID); err != nil {
router.Logger.Error().Logf("Error processing event: " + err.Error())
Expand Down Expand Up @@ -658,6 +631,7 @@ func (r *Router) processEvent(ev *types.Event, reqID interface{}) error {
if err != nil {
r.Metrics.Increment(r.incomingOrPeer + "_router_dropped")
debugLog.Logf("Dropping span from batch, channel full")
types.DisposeEvent(&span.Event)
return err
}

Expand Down
1 change: 1 addition & 0 deletions transmit/transmit.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (d *DefaultTransmission) EnqueueEvent(ev *types.Event) {
Logf("failed to enqueue event")
}
d.Metrics.Up(updownQueuedItems)
types.DisposeEvent(ev)
}

func (d *DefaultTransmission) EnqueueSpan(sp *types.Span) {
Expand Down
44 changes: 44 additions & 0 deletions types/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package types
import (
"context"
"slices"
"sync"
"time"

huskyotlp "github.com/honeycombio/husky/otlp"
Expand Down Expand Up @@ -33,6 +34,49 @@ type Event struct {
Data map[string]interface{}
}

func (e *Event) reset() {
e.Context = context.Background()
e.APIHost = ""
e.APIKey = ""
e.Dataset = ""
e.Environment = ""
e.SampleRate = 0
e.Timestamp = time.Time{}
clear(e.Data)
}

var pooledEvents = sync.Pool{
New: func() interface{} {
return &Event{}
},
}

func NewEvent(ctx context.Context, apiHost, apiKey, dataset, environment string, sampleRate uint, timestamp time.Time, data map[string]interface{}) *Event {
event := pooledEvents.Get().(*Event)
event.Context = ctx
event.APIHost = apiHost
event.APIKey = apiKey
event.Dataset = dataset
event.Environment = environment
event.SampleRate = sampleRate
event.Timestamp = timestamp
event.Data = data
return event
}

func NewDecisionEvent(ctx context.Context, apiKey string, dataset string) *Event {
event := pooledEvents.Get().(*Event)
event.Context = ctx
event.APIKey = apiKey
event.Dataset = dataset
return event
}

func DisposeEvent(event *Event) {
event.reset()
pooledEvents.Put(event)
}

// Trace isn't something that shows up on the wire; it gets created within
// Refinery. Traces are not thread-safe; only one goroutine should be working
// with a trace object at a time.
Expand Down

0 comments on commit f54052c

Please sign in to comment.