8
8
"context"
9
9
"errors"
10
10
"fmt"
11
- "strings"
12
11
"sync"
13
12
"sync/atomic"
14
13
@@ -108,11 +107,11 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
108
107
p .Lock ()
109
108
defer p .Unlock ()
110
109
111
- var errs multierror
110
+ var err error
112
111
for _ , c := range p .callbacks {
113
112
// TODO make the callbacks parallel. ( #3034 )
114
- if err := c (ctx ); err != nil {
115
- errs . append (err )
113
+ if e := c (ctx ); e != nil {
114
+ err = errors . Join (err , e )
116
115
}
117
116
if err := ctx .Err (); err != nil {
118
117
rm .Resource = nil
@@ -123,8 +122,8 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
123
122
for e := p .multiCallbacks .Front (); e != nil ; e = e .Next () {
124
123
// TODO make the callbacks parallel. ( #3034 )
125
124
f := e .Value .(multiCallback )
126
- if err := f (ctx ); err != nil {
127
- errs . append (err )
125
+ if e := f (ctx ); e != nil {
126
+ err = errors . Join (err , e )
128
127
}
129
128
if err := ctx .Err (); err != nil {
130
129
// This means the context expired before we finished running callbacks.
@@ -160,7 +159,7 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
160
159
161
160
rm .ScopeMetrics = rm .ScopeMetrics [:i ]
162
161
163
- return errs . errorOrNil ()
162
+ return err
164
163
}
165
164
166
165
// inserter facilitates inserting of new instruments from a single scope into a
@@ -222,17 +221,17 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
222
221
measures []aggregate.Measure [N ]
223
222
)
224
223
225
- errs := & multierror { wrapped : errCreatingAggregators }
224
+ var err error
226
225
seen := make (map [uint64 ]struct {})
227
226
for _ , v := range i .pipeline .views {
228
227
stream , match := v (inst )
229
228
if ! match {
230
229
continue
231
230
}
232
231
matched = true
233
- in , id , err := i .cachedAggregator (inst .Scope , inst .Kind , stream , readerAggregation )
234
- if err != nil {
235
- errs . append (err )
232
+ in , id , e := i .cachedAggregator (inst .Scope , inst .Kind , stream , readerAggregation )
233
+ if e != nil {
234
+ err = errors . Join (err , e )
236
235
}
237
236
if in == nil { // Drop aggregation.
238
237
continue
@@ -245,8 +244,12 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
245
244
measures = append (measures , in )
246
245
}
247
246
247
+ if err != nil {
248
+ err = errors .Join (errCreatingAggregators , err )
249
+ }
250
+
248
251
if matched {
249
- return measures , errs . errorOrNil ()
252
+ return measures , err
250
253
}
251
254
252
255
// Apply implicit default view if no explicit matched.
@@ -255,15 +258,18 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
255
258
Description : inst .Description ,
256
259
Unit : inst .Unit ,
257
260
}
258
- in , _ , err := i .cachedAggregator (inst .Scope , inst .Kind , stream , readerAggregation )
259
- if err != nil {
260
- errs .append (err )
261
+ in , _ , e := i .cachedAggregator (inst .Scope , inst .Kind , stream , readerAggregation )
262
+ if e != nil {
263
+ if err == nil {
264
+ err = errCreatingAggregators
265
+ }
266
+ err = errors .Join (err , e )
261
267
}
262
268
if in != nil {
263
269
// Ensured to have not seen given matched was false.
264
270
measures = append (measures , in )
265
271
}
266
- return measures , errs . errorOrNil ()
272
+ return measures , err
267
273
}
268
274
269
275
// addCallback registers a single instrument callback to be run when
@@ -608,15 +614,15 @@ func newResolver[N int64 | float64](p pipelines, vc *cache[string, instID]) reso
608
614
func (r resolver [N ]) Aggregators (id Instrument ) ([]aggregate.Measure [N ], error ) {
609
615
var measures []aggregate.Measure [N ]
610
616
611
- errs := & multierror {}
617
+ var err error
612
618
for _ , i := range r .inserters {
613
- in , err := i .Instrument (id , i .readerDefaultAggregation (id .Kind ))
614
- if err != nil {
615
- errs . append (err )
619
+ in , e := i .Instrument (id , i .readerDefaultAggregation (id .Kind ))
620
+ if e != nil {
621
+ err = errors . Join (err , e )
616
622
}
617
623
measures = append (measures , in ... )
618
624
}
619
- return measures , errs . errorOrNil ()
625
+ return measures , err
620
626
}
621
627
622
628
// HistogramAggregators returns the histogram Aggregators that must be updated by the instrument
@@ -625,37 +631,18 @@ func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error)
625
631
func (r resolver [N ]) HistogramAggregators (id Instrument , boundaries []float64 ) ([]aggregate.Measure [N ], error ) {
626
632
var measures []aggregate.Measure [N ]
627
633
628
- errs := & multierror {}
634
+ var err error
629
635
for _ , i := range r .inserters {
630
636
agg := i .readerDefaultAggregation (id .Kind )
631
637
if histAgg , ok := agg .(AggregationExplicitBucketHistogram ); ok && len (boundaries ) > 0 {
632
638
histAgg .Boundaries = boundaries
633
639
agg = histAgg
634
640
}
635
- in , err := i .Instrument (id , agg )
636
- if err != nil {
637
- errs . append (err )
641
+ in , e := i .Instrument (id , agg )
642
+ if e != nil {
643
+ err = errors . Join (err , e )
638
644
}
639
645
measures = append (measures , in ... )
640
646
}
641
- return measures , errs .errorOrNil ()
642
- }
643
-
644
- type multierror struct {
645
- wrapped error
646
- errors []string
647
- }
648
-
649
- func (m * multierror ) errorOrNil () error {
650
- if len (m .errors ) == 0 {
651
- return nil
652
- }
653
- if m .wrapped == nil {
654
- return errors .New (strings .Join (m .errors , "; " ))
655
- }
656
- return fmt .Errorf ("%w: %s" , m .wrapped , strings .Join (m .errors , "; " ))
657
- }
658
-
659
- func (m * multierror ) append (err error ) {
660
- m .errors = append (m .errors , err .Error ())
647
+ return measures , err
661
648
}
0 commit comments