Skip to content

Commit

Permalink
[chore]: [deltatocumulative]: remove nested implementation (#36498)
Browse files Browse the repository at this point in the history
#### Description

Removes the nested (aka overloading `streams.Map`) implementation.

This has been entirely replaced by a leaner, "linear" implementation:
-
#35048
-
#36486

<!--Describe what testing was performed and which tests were added.-->
#### Testing

Existing tests continue to pass unaltered

<!--Describe the documentation added.-->
#### Documentation

not needed

<!--Please delete paragraphs that you did not use before submitting.-->
  • Loading branch information
sh0rez authored Dec 3, 2024
1 parent 5002476 commit 0d6b220
Show file tree
Hide file tree
Showing 28 changed files with 188 additions and 1,896 deletions.
51 changes: 0 additions & 51 deletions processor/deltatocumulativeprocessor/chain.go

This file was deleted.

2 changes: 1 addition & 1 deletion processor/deltatocumulativeprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"go.opentelemetry.io/collector/component"

telemetry "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry"
telemetry "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry"
)

var _ component.ConfigValidator = (*Config)(nil)
Expand Down
9 changes: 3 additions & 6 deletions processor/deltatocumulativeprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"

ltel "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry"
)

func NewFactory() processor.Factory {
Expand All @@ -29,13 +29,10 @@ func createMetricsProcessor(_ context.Context, set processor.Settings, cfg compo
return nil, fmt.Errorf("configuration parsing error")
}

ltel, err := ltel.New(set.TelemetrySettings)
tel, err := telemetry.New(set.TelemetrySettings)
if err != nil {
return nil, err
}

proc := newProcessor(pcfg, set.Logger, &ltel.TelemetryBuilder, next)
linear := newLinear(pcfg, ltel, proc)

return Chain{linear, proc}, nil
return newProcessor(pcfg, tel, next), nil
}
2 changes: 1 addition & 1 deletion processor/deltatocumulativeprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.32.0
go.opentelemetry.io/otel/trace v1.32.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
golang.org/x/tools v0.26.0
gopkg.in/yaml.v3 v3.0.1
)
Expand Down Expand Up @@ -52,6 +51,7 @@ require (
go.opentelemetry.io/collector/processor/processorprofiles v0.114.1-0.20241202231142-b9ff1bc54c99 // indirect
go.opentelemetry.io/otel/sdk v1.32.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sys v0.27.0 // indirect
golang.org/x/text v0.19.0 // indirect
Expand Down
93 changes: 0 additions & 93 deletions processor/deltatocumulativeprocessor/internal/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,116 +4,23 @@
package data // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
)

var (
_ Point[Number] = Number{}
_ Point[Histogram] = Histogram{}
_ Point[ExpHistogram] = ExpHistogram{}
_ Point[Summary] = Summary{}
)

type Point[Self any] interface {
StartTimestamp() pcommon.Timestamp
Timestamp() pcommon.Timestamp
Attributes() pcommon.Map

Clone() Self
CopyTo(Self)

Add(Self) Self
}

type Typed[Self any] interface {
Point[Self]
Number | Histogram | ExpHistogram | Summary
}

type Number struct {
pmetric.NumberDataPoint
}

func Zero[P Typed[P]]() P {
var point P
switch ty := any(&point).(type) {
case *Number:
ty.NumberDataPoint = pmetric.NewNumberDataPoint()
case *Histogram:
ty.HistogramDataPoint = pmetric.NewHistogramDataPoint()
case *ExpHistogram:
ty.DataPoint = pmetric.NewExponentialHistogramDataPoint()
}
return point
}

func (dp Number) Clone() Number {
clone := Number{NumberDataPoint: pmetric.NewNumberDataPoint()}
if dp.NumberDataPoint != (pmetric.NumberDataPoint{}) {
dp.CopyTo(clone)
}
return clone
}

func (dp Number) CopyTo(dst Number) {
dp.NumberDataPoint.CopyTo(dst.NumberDataPoint)
}

type Histogram struct {
pmetric.HistogramDataPoint
}

func (dp Histogram) Clone() Histogram {
clone := Histogram{HistogramDataPoint: pmetric.NewHistogramDataPoint()}
if dp.HistogramDataPoint != (pmetric.HistogramDataPoint{}) {
dp.CopyTo(clone)
}
return clone
}

func (dp Histogram) CopyTo(dst Histogram) {
dp.HistogramDataPoint.CopyTo(dst.HistogramDataPoint)
}

type ExpHistogram struct {
expo.DataPoint
}

func (dp ExpHistogram) Clone() ExpHistogram {
clone := ExpHistogram{DataPoint: pmetric.NewExponentialHistogramDataPoint()}
if dp.DataPoint != (expo.DataPoint{}) {
dp.CopyTo(clone)
}
return clone
}

func (dp ExpHistogram) CopyTo(dst ExpHistogram) {
dp.DataPoint.CopyTo(dst.DataPoint)
}

type mustPoint[D Point[D]] struct{ _ D }

var (
_ = mustPoint[Number]{}
_ = mustPoint[Histogram]{}
_ = mustPoint[ExpHistogram]{}
)

type Summary struct {
pmetric.SummaryDataPoint
}

func (dp Summary) Clone() Summary {
clone := Summary{SummaryDataPoint: pmetric.NewSummaryDataPoint()}
if dp.SummaryDataPoint != (pmetric.SummaryDataPoint{}) {
dp.CopyTo(clone)
}
return clone
}

func (dp Summary) CopyTo(dst Summary) {
dp.SummaryDataPoint.CopyTo(dst.SummaryDataPoint)
}
54 changes: 0 additions & 54 deletions processor/deltatocumulativeprocessor/internal/delta/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,55 +9,9 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams"
)

func New[D data.Point[D]]() Accumulator[D] {
return Accumulator[D]{
Map: make(exp.HashMap[D]),
}
}

var _ streams.Map[data.Number] = (*Accumulator[data.Number])(nil)

type Accumulator[D data.Point[D]] struct {
streams.Map[D]
}

func (a Accumulator[D]) Store(id streams.Ident, dp D) error {
aggr, ok := a.Map.Load(id)

// new series: initialize with current sample
if !ok {
clone := dp.Clone()
return a.Map.Store(id, clone)
}

// drop bad samples
switch {
case dp.StartTimestamp() < aggr.StartTimestamp():
// belongs to older series
return ErrOlderStart{Start: aggr.StartTimestamp(), Sample: dp.StartTimestamp()}
case dp.Timestamp() <= aggr.Timestamp():
// out of order
return ErrOutOfOrder{Last: aggr.Timestamp(), Sample: dp.Timestamp()}
}

// detect gaps
var gap error
if dp.StartTimestamp() > aggr.Timestamp() {
gap = ErrGap{From: aggr.Timestamp(), To: dp.StartTimestamp()}
}

res := aggr.Add(dp)
if err := a.Map.Store(id, res); err != nil {
return err
}
return gap
}

type ErrOlderStart struct {
Start pcommon.Timestamp
Sample pcommon.Timestamp
Expand All @@ -76,14 +30,6 @@ func (e ErrOutOfOrder) Error() string {
return fmt.Sprintf("out of order: dropped sample from time=%s, because series is already at time=%s", e.Sample, e.Last)
}

type ErrGap struct {
From, To pcommon.Timestamp
}

func (e ErrGap) Error() string {
return fmt.Sprintf("gap in stream from %s to %s. samples were likely lost in transit", e.From, e.To)
}

type Type interface {
pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint

Expand Down
Loading

0 comments on commit 0d6b220

Please sign in to comment.