Skip to content

Commit

Permalink
[chore] Remove experimental dead code related to streams and staleness
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Dec 20, 2024
1 parent 96368fa commit d58a040
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 352 deletions.
2 changes: 0 additions & 2 deletions internal/exp/metrics/identity/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
)

type metric = Metric

type Metric struct {
scope

Expand Down
4 changes: 2 additions & 2 deletions internal/exp/metrics/identity/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
)

type Stream struct {
metric
attrs [16]byte
metric Metric
attrs [16]byte
}

func (i Stream) Hash() hash.Hash64 {
Expand Down
16 changes: 14 additions & 2 deletions internal/exp/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics"

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams"
)

// Merge will merge the metrics data in mdB into mdA, then return mdA.
Expand Down Expand Up @@ -111,7 +111,7 @@ outer:
return smA
}

func mergeDataPoints[DPS streams.DataPointSlice[DP], DP streams.DataPoint[DP]](dataPointsA DPS, dataPointsB DPS) DPS {
func mergeDataPoints[DPS dataPointSlice[DP], DP dataPoint[DP]](dataPointsA DPS, dataPointsB DPS) DPS {
// Append all the datapoints from B to A
for i := 0; i < dataPointsB.Len(); i++ {
dpB := dataPointsB.At(i)
Expand All @@ -122,3 +122,15 @@ func mergeDataPoints[DPS streams.DataPointSlice[DP], DP streams.DataPoint[DP]](d

return dataPointsA
}

type dataPointSlice[DP dataPoint[DP]] interface {
Len() int
At(i int) DP
AppendEmpty() DP
}

type dataPoint[Self any] interface {
Timestamp() pcommon.Timestamp
Attributes() pcommon.Map
CopyTo(dest Self)
}
134 changes: 0 additions & 134 deletions internal/exp/metrics/staleness/staleness.go

This file was deleted.

131 changes: 0 additions & 131 deletions internal/exp/metrics/staleness/staleness_test.go

This file was deleted.

40 changes: 40 additions & 0 deletions internal/exp/metrics/staleness/tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness"

import (
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

type Tracker struct {
pq PriorityQueue
}

func NewTracker() Tracker {
return Tracker{pq: NewPriorityQueue()}
}

func (tr Tracker) Refresh(ts time.Time, ids ...identity.Stream) {
for _, id := range ids {
tr.pq.Update(id, ts)
}
}

func (tr Tracker) Collect(maxDuration time.Duration) []identity.Stream {
now := time.Now()

var ids []identity.Stream
for tr.pq.Len() > 0 {
_, ts := tr.pq.Peek()
if now.Sub(ts) < maxDuration {
break
}
id, _ := tr.pq.Pop()
ids = append(ids, id)
}

return ids
}
Loading

0 comments on commit d58a040

Please sign in to comment.