From 13a70a5a1cb3f0358afff6c378ead03a5e2e7956 Mon Sep 17 00:00:00 2001 From: Matthias Loibl Date: Thu, 28 Nov 2024 15:40:27 +0100 Subject: [PATCH] pkg: Create generic AttributesStore Rather than creating many implementations of the AttributesStore per type, let's use the new Go generic types. Starting to use this code as a library I was wondering if this was possible. Turns out it is. --- pkg/arrow/from_record.go | 56 ++++++ pkg/otel/common/otlp/attributes.go | 209 +++------------------ pkg/otel/common/otlp/resource.go | 2 +- pkg/otel/common/otlp/scope.go | 2 +- pkg/otel/logs/otlp/related_data.go | 18 +- pkg/otel/metrics/otlp/ehistogram.go | 2 +- pkg/otel/metrics/otlp/exemplar.go | 2 +- pkg/otel/metrics/otlp/histogram.go | 2 +- pkg/otel/metrics/otlp/number_data_point.go | 4 +- pkg/otel/metrics/otlp/related_data.go | 54 +++--- pkg/otel/metrics/otlp/summary.go | 2 +- pkg/otel/traces/otlp/event.go | 2 +- pkg/otel/traces/otlp/link.go | 4 +- pkg/otel/traces/otlp/related_data.go | 30 +-- 14 files changed, 140 insertions(+), 249 deletions(-) diff --git a/pkg/arrow/from_record.go b/pkg/arrow/from_record.go index 40c7529a..96c97fd8 100644 --- a/pkg/arrow/from_record.go +++ b/pkg/arrow/from_record.go @@ -20,13 +20,69 @@ package arrow // Utility functions to extract values from Arrow Records. import ( + "fmt" + "github.com/apache/arrow/go/v17/arrow" "github.com/apache/arrow/go/v17/arrow/array" + "golang.org/x/exp/constraints" "github.com/open-telemetry/otel-arrow/pkg/otel/common" "github.com/open-telemetry/otel-arrow/pkg/werror" ) +// UnsignedFromRecord returns the unsigned value for a specific row and column in an +// Arrow record. If the value is null, it returns 0. +func UnsignedFromRecord[unsigned constraints.Unsigned](record arrow.Record, fieldID int, row int) (unsigned, error) { + if fieldID == AbsentFieldID { + return 0, nil + } + + arr := record.Column(fieldID) + if arr == nil { + return 0, nil + } + + switch arr := arr.(type) { + case *array.Dictionary: + switch dict := arr.Dictionary().(type) { + case *array.Uint32: + if arr.IsNull(row) { + return 0, nil + } else { + return unsigned(dict.Value(arr.GetValueIndex(row))), nil + } + default: + return 0, werror.WrapWithMsg(ErrInvalidArrayType, fmt.Sprintf("dictionary of type %T is unsupported", dict)) + } + case *array.Uint8: + if arr.IsNull(row) { + return 0, nil + } else { + return unsigned(arr.Value(row)), nil + } + case *array.Uint16: + if arr.IsNull(row) { + return 0, nil + } else { + return unsigned(arr.Value(row)), nil + } + case *array.Uint32: + if arr.IsNull(row) { + return 0, nil + } else { + return unsigned(arr.Value(row)), nil + } + case *array.Uint64: + if arr.IsNull(row) { + return 0, nil + } else { + return unsigned(arr.Value(row)), nil + } + default: + return 0, werror.WrapWithMsg(ErrInvalidArrayType, fmt.Sprintf("array of type %T is unsupported", arr)) + } +} + // U8FromRecord returns the uint8 value for a specific row and column in an // Arrow record. If the value is null, it returns 0. func U8FromRecord(record arrow.Record, fieldID int, row int) (uint8, error) { diff --git a/pkg/otel/common/otlp/attributes.go b/pkg/otel/common/otlp/attributes.go index 990a3013..638bc39f 100644 --- a/pkg/otel/common/otlp/attributes.go +++ b/pkg/otel/common/otlp/attributes.go @@ -20,6 +20,7 @@ package otlp import ( "github.com/apache/arrow/go/v17/arrow" "go.opentelemetry.io/collector/pdata/pcommon" + "golang.org/x/exp/constraints" arrowutils "github.com/open-telemetry/otel-arrow/pkg/arrow" "github.com/open-telemetry/otel-arrow/pkg/otel/common" @@ -45,57 +46,33 @@ type ( Ser int } - // Attributes16Store is a store for attributes. + // AttributesStore is a store for attributes. // The attributes are stored in a map by ID. This ID represents the // identifier of the main entity (span, event, link, etc.) to which the // attributes are attached. So the maximum number of attributes per entity // is not limited. - Attributes16Store struct { - lastID uint16 - attributesByID map[uint16]*pcommon.Map + AttributesStore[u constraints.Unsigned] struct { + lastID u + attributesByID map[u]*pcommon.Map } - // Attributes32Store is a store for attributes. - // The attributes are stored in a map by ID. This ID represents the - // identifier of the main entity (span, event, link, etc.) to which the - // attributes are attached. So the maximum number of attributes per entity - // is not limited. - Attributes32Store struct { - lastID uint32 - attributesByID map[uint32]*pcommon.Map - } - - Attrs16ParentIdDecoder struct { - prevParentID uint16 - prevKey string - prevValue *pcommon.Value - encodingType int - } - - Attrs32ParentIdDecoder struct { - prevParentID uint32 + AttrsParentIDDecoder[u constraints.Unsigned] struct { + prevParentID u prevKey string prevValue *pcommon.Value encodingType int } ) -// NewAttributes16Store creates a new Attributes16Store. -func NewAttributes16Store() *Attributes16Store { - return &Attributes16Store{ - attributesByID: make(map[uint16]*pcommon.Map), - } -} - -// NewAttributes32Store creates a new Attributes32Store. -func NewAttributes32Store() *Attributes32Store { - return &Attributes32Store{ - attributesByID: make(map[uint32]*pcommon.Map), +// NewAttributesStore creates a new AttributesStore. +func NewAttributesStore[u constraints.Unsigned]() *AttributesStore[u] { + return &AttributesStore[u]{ + attributesByID: make(map[u]*pcommon.Map), } } // AttributesByDeltaID returns the attributes for the given Delta ID. -func (s *Attributes16Store) AttributesByDeltaID(ID uint16) *pcommon.Map { +func (s *AttributesStore[u]) AttributesByDeltaID(ID u) *pcommon.Map { s.lastID += ID if m, ok := s.attributesByID[s.lastID]; ok { return m @@ -104,34 +81,17 @@ func (s *Attributes16Store) AttributesByDeltaID(ID uint16) *pcommon.Map { } // AttributesByID returns the attributes for the given ID. -func (s *Attributes16Store) AttributesByID(ID uint16) *pcommon.Map { +func (s *AttributesStore[u]) AttributesByID(ID u) *pcommon.Map { if m, ok := s.attributesByID[ID]; ok { return m } return nil } -// AttributesByID returns the attributes for the given ID. -func (s *Attributes32Store) AttributesByID(ID uint32) *pcommon.Map { - if m, ok := s.attributesByID[ID]; ok { - return m - } - return nil -} - -// AttributesByDeltaID returns the attributes for the given Delta ID. -func (s *Attributes32Store) AttributesByDeltaID(ID uint32) *pcommon.Map { - s.lastID += ID - if m, ok := s.attributesByID[s.lastID]; ok { - return m - } - return nil -} - -// Attributes16StoreFrom creates an Attribute16Store from an arrow.Record. +// AttributesStoreFrom creates an Attribute16Store from an arrow.Record. // Note: This function doesn't release the record passed as argument. This is // the responsibility of the caller -func Attributes16StoreFrom(record arrow.Record, store *Attributes16Store) error { +func AttributesStoreFrom[unsigned constraints.Unsigned](record arrow.Record, store *AttributesStore[unsigned]) error { attrIDS, err := SchemaToAttributeIDs(record.Schema()) if err != nil { return werror.Wrap(err) @@ -139,7 +99,7 @@ func Attributes16StoreFrom(record arrow.Record, store *Attributes16Store) error attrsCount := int(record.NumRows()) - parentIdDecoder := NewAttrs16ParentIdDecoder() + parentIDDecoder := NewAttrsParentIDDecoder[unsigned]() // Read all key/value tuples from the record and reconstruct the attributes // map by ID. @@ -153,7 +113,6 @@ func Attributes16StoreFrom(record arrow.Record, store *Attributes16Store) error if err != nil { return werror.Wrap(err) } - value := pcommon.NewValueEmpty() switch pcommon.ValueType(vType) { case pcommon.ValueTypeStr: @@ -206,106 +165,12 @@ func Attributes16StoreFrom(record arrow.Record, store *Attributes16Store) error // silently ignore unknown types to avoid DOS attacks } - deltaOrParentID, err := arrowutils.U16FromRecord(record, attrIDS.ParentID, i) + deltaOrParentID, err := arrowutils.UnsignedFromRecord[unsigned](record, attrIDS.ParentID, i) if err != nil { return werror.Wrap(err) } - parentID := parentIdDecoder.Decode(deltaOrParentID, key, &value) - m, ok := store.attributesByID[parentID] - if !ok { - newMap := pcommon.NewMap() - m = &newMap - store.attributesByID[parentID] = m - } - value.CopyTo(m.PutEmpty(key)) - } - - return nil -} - -// Attributes32StoreFrom creates an Attributes32Store from an arrow.Record. -// Note: This function doesn't release the record passed as argument. This is -// the responsibility of the caller -func Attributes32StoreFrom(record arrow.Record, store *Attributes32Store) error { - attrIDS, err := SchemaToAttributeIDs(record.Schema()) - if err != nil { - return werror.Wrap(err) - } - - attrsCount := int(record.NumRows()) - - parentIdDecoder := NewAttrs32ParentIdDecoder() - - // Read all key/value tuples from the record and reconstruct the attributes - // map by ID. - for i := 0; i < attrsCount; i++ { - key, err := arrowutils.StringFromRecord(record, attrIDS.Key, i) - if err != nil { - return werror.Wrap(err) - } - - vType, err := arrowutils.U8FromRecord(record, attrIDS.Type, i) - if err != nil { - return werror.Wrap(err) - } - value := pcommon.NewValueEmpty() - switch pcommon.ValueType(vType) { - case pcommon.ValueTypeStr: - v, err := arrowutils.StringFromRecord(record, attrIDS.Str, i) - if err != nil { - return werror.Wrap(err) - } - value.SetStr(v) - case pcommon.ValueTypeInt: - v, err := arrowutils.I64FromRecord(record, attrIDS.Int, i) - if err != nil { - return werror.Wrap(err) - } - value.SetInt(v) - case pcommon.ValueTypeDouble: - v, err := arrowutils.F64FromRecord(record, attrIDS.Double, i) - if err != nil { - return werror.Wrap(err) - } - value.SetDouble(v) - case pcommon.ValueTypeBool: - v, err := arrowutils.BoolFromRecord(record, attrIDS.Bool, i) - if err != nil { - return werror.Wrap(err) - } - value.SetBool(v) - case pcommon.ValueTypeBytes: - v, err := arrowutils.BinaryFromRecord(record, attrIDS.Bytes, i) - if err != nil { - return werror.Wrap(err) - } - value.SetEmptyBytes().FromRaw(v) - case pcommon.ValueTypeSlice: - v, err := arrowutils.BinaryFromRecord(record, attrIDS.Ser, i) - if err != nil { - return werror.Wrap(err) - } - if err = common.Deserialize(v, value); err != nil { - return werror.Wrap(err) - } - case pcommon.ValueTypeMap: - v, err := arrowutils.BinaryFromRecord(record, attrIDS.Ser, i) - if err != nil { - return werror.Wrap(err) - } - if err = common.Deserialize(v, value); err != nil { - return werror.Wrap(err) - } - default: - // silently ignore unknown types to avoid DOS attacks - } - - deltaOrParentID, err := arrowutils.U32FromRecord(record, attrIDS.ParentID, i) - if err != nil { - return werror.Wrap(err) - } - parentID := parentIdDecoder.Decode(deltaOrParentID, key, &value) + parentID := parentIDDecoder.Decode(deltaOrParentID, key, &value) m, ok := store.attributesByID[parentID] if !ok { @@ -380,43 +245,13 @@ func SchemaToAttributeIDs(schema *arrow.Schema) (*AttributeIDs, error) { }, nil } -func NewAttrs16ParentIdDecoder() *Attrs16ParentIdDecoder { - return &Attrs16ParentIdDecoder{ - encodingType: carrow.ParentIdDeltaGroupEncoding, - } -} - -func (d *Attrs16ParentIdDecoder) Decode(deltaOrParentID uint16, key string, value *pcommon.Value) uint16 { - switch d.encodingType { - case carrow.ParentIdNoEncoding: - return deltaOrParentID - case carrow.ParentIdDeltaEncoding: - decodedParentID := d.prevParentID + deltaOrParentID - d.prevParentID = decodedParentID - return decodedParentID - case carrow.ParentIdDeltaGroupEncoding: - if d.prevKey == key && carrow.Equal(d.prevValue, value) { - parentID := d.prevParentID + deltaOrParentID - d.prevParentID = parentID - return parentID - } else { - d.prevKey = key - d.prevValue = value - d.prevParentID = deltaOrParentID - return deltaOrParentID - } - default: - panic("unknown attrs16 parent ID encoding type") - } -} - -func NewAttrs32ParentIdDecoder() *Attrs32ParentIdDecoder { - return &Attrs32ParentIdDecoder{ +func NewAttrsParentIDDecoder[u constraints.Unsigned]() *AttrsParentIDDecoder[u] { + return &AttrsParentIDDecoder[u]{ encodingType: carrow.ParentIdDeltaGroupEncoding, } } -func (d *Attrs32ParentIdDecoder) Decode(deltaOrParentID uint32, key string, value *pcommon.Value) uint32 { +func (d *AttrsParentIDDecoder[unsigned]) Decode(deltaOrParentID unsigned, key string, value *pcommon.Value) unsigned { switch d.encodingType { case carrow.ParentIdNoEncoding: return deltaOrParentID @@ -436,6 +271,6 @@ func (d *Attrs32ParentIdDecoder) Decode(deltaOrParentID uint32, key string, valu return deltaOrParentID } default: - panic("unknown attrs32 parent ID encoding type") + panic("unknown parent ID encoding type") } } diff --git a/pkg/otel/common/otlp/resource.go b/pkg/otel/common/otlp/resource.go index 7069224b..81b61912 100644 --- a/pkg/otel/common/otlp/resource.go +++ b/pkg/otel/common/otlp/resource.go @@ -48,7 +48,7 @@ func NewResourceIdsFromSchema(schema *arrow.Schema) (*ResourceIds, error) { }, nil } -func UpdateResourceFromRecord(r pcommon.Resource, record arrow.Record, row int, resIds *ResourceIds, attrsStore *Attributes16Store) (schemaUrl string, err error) { +func UpdateResourceFromRecord(r pcommon.Resource, record arrow.Record, row int, resIds *ResourceIds, attrsStore *AttributesStore[uint16]) (schemaUrl string, err error) { resArr, err := arrowutils.StructFromRecord(record, resIds.Resource, row) if err != nil { return "", werror.WrapWithContext(err, map[string]interface{}{"row": row}) diff --git a/pkg/otel/common/otlp/scope.go b/pkg/otel/common/otlp/scope.go index 7f7afb5a..714ed2bc 100644 --- a/pkg/otel/common/otlp/scope.go +++ b/pkg/otel/common/otlp/scope.go @@ -55,7 +55,7 @@ func UpdateScopeFromRecord( record arrow.Record, row int, ids *ScopeIds, - attrsStore *Attributes16Store, + attrsStore *AttributesStore[uint16], ) error { scopeArray, err := arrowutils.StructFromRecord(record, ids.Scope, row) if err != nil { diff --git a/pkg/otel/logs/otlp/related_data.go b/pkg/otel/logs/otlp/related_data.go index ac2203b4..7e4c6f8b 100644 --- a/pkg/otel/logs/otlp/related_data.go +++ b/pkg/otel/logs/otlp/related_data.go @@ -30,17 +30,17 @@ import ( type ( RelatedData struct { LogRecordID uint16 - ResAttrMapStore *otlp.Attributes16Store - ScopeAttrMapStore *otlp.Attributes16Store - LogRecordAttrMapStore *otlp.Attributes16Store + ResAttrMapStore *otlp.AttributesStore[uint16] + ScopeAttrMapStore *otlp.AttributesStore[uint16] + LogRecordAttrMapStore *otlp.AttributesStore[uint16] } ) func NewRelatedData() *RelatedData { return &RelatedData{ - ResAttrMapStore: otlp.NewAttributes16Store(), - ScopeAttrMapStore: otlp.NewAttributes16Store(), - LogRecordAttrMapStore: otlp.NewAttributes16Store(), + ResAttrMapStore: otlp.NewAttributesStore[uint16](), + ScopeAttrMapStore: otlp.NewAttributesStore[uint16](), + LogRecordAttrMapStore: otlp.NewAttributesStore[uint16](), } } @@ -62,17 +62,17 @@ func RelatedDataFrom(records []*record_message.RecordMessage) (relatedData *Rela for _, record := range records { switch record.PayloadType() { case colarspb.ArrowPayloadType_RESOURCE_ATTRS: - err = otlp.Attributes16StoreFrom(record.Record(), relatedData.ResAttrMapStore) + err = otlp.AttributesStoreFrom[uint16](record.Record(), relatedData.ResAttrMapStore) if err != nil { return nil, nil, werror.Wrap(err) } case colarspb.ArrowPayloadType_SCOPE_ATTRS: - err = otlp.Attributes16StoreFrom(record.Record(), relatedData.ScopeAttrMapStore) + err = otlp.AttributesStoreFrom[uint16](record.Record(), relatedData.ScopeAttrMapStore) if err != nil { return nil, nil, werror.Wrap(err) } case colarspb.ArrowPayloadType_LOG_ATTRS: - err = otlp.Attributes16StoreFrom(record.Record(), relatedData.LogRecordAttrMapStore) + err = otlp.AttributesStoreFrom[uint16](record.Record(), relatedData.LogRecordAttrMapStore) if err != nil { return nil, nil, werror.Wrap(err) } diff --git a/pkg/otel/metrics/otlp/ehistogram.go b/pkg/otel/metrics/otlp/ehistogram.go index ca770068..9668b9a7 100644 --- a/pkg/otel/metrics/otlp/ehistogram.go +++ b/pkg/otel/metrics/otlp/ehistogram.go @@ -149,7 +149,7 @@ func SchemaToEHistogramIDs(schema *arrow.Schema) (*EHistogramDataPointIDs, error // // Important Note: This function doesn't take ownership of the record. The // caller is responsible for releasing it. -func EHistogramDataPointsStoreFrom(record arrow.Record, exemplarsStore *ExemplarsStore, attrsStore *otlp.Attributes32Store) (*EHistogramDataPointsStore, error) { +func EHistogramDataPointsStoreFrom(record arrow.Record, exemplarsStore *ExemplarsStore, attrsStore *otlp.AttributesStore[uint32]) (*EHistogramDataPointsStore, error) { store := &EHistogramDataPointsStore{ dataPointsByID: make(map[uint16]pmetric.ExponentialHistogramDataPointSlice), } diff --git a/pkg/otel/metrics/otlp/exemplar.go b/pkg/otel/metrics/otlp/exemplar.go index 8de11211..8938559c 100644 --- a/pkg/otel/metrics/otlp/exemplar.go +++ b/pkg/otel/metrics/otlp/exemplar.go @@ -107,7 +107,7 @@ func (s *ExemplarsStore) ExemplarsByID(ID uint32) pmetric.ExemplarSlice { // caller is responsible for releasing it. func ExemplarsStoreFrom( record arrow.Record, - attrsStore *otlp.Attributes32Store, + attrsStore *otlp.AttributesStore[uint32], ) (*ExemplarsStore, error) { store := &ExemplarsStore{ exemplarsByIDs: make(map[uint32]pmetric.ExemplarSlice), diff --git a/pkg/otel/metrics/otlp/histogram.go b/pkg/otel/metrics/otlp/histogram.go index 437b7192..748f7208 100644 --- a/pkg/otel/metrics/otlp/histogram.go +++ b/pkg/otel/metrics/otlp/histogram.go @@ -136,7 +136,7 @@ func SchemaToHistogramIDs(schema *arrow.Schema) (*HistogramDataPointIDs, error) // // Important Note: This function doesn't take ownership of the record. The // caller is responsible for releasing it. -func HistogramDataPointsStoreFrom(record arrow.Record, exemplarsStore *ExemplarsStore, attrsStore *otlp.Attributes32Store) (*HistogramDataPointsStore, error) { +func HistogramDataPointsStoreFrom(record arrow.Record, exemplarsStore *ExemplarsStore, attrsStore *otlp.AttributesStore[uint32]) (*HistogramDataPointsStore, error) { store := &HistogramDataPointsStore{ dataPointsByID: make(map[uint16]pmetric.HistogramDataPointSlice), } diff --git a/pkg/otel/metrics/otlp/number_data_point.go b/pkg/otel/metrics/otlp/number_data_point.go index 97a80b20..3b4bd1c5 100644 --- a/pkg/otel/metrics/otlp/number_data_point.go +++ b/pkg/otel/metrics/otlp/number_data_point.go @@ -20,7 +20,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" arrowutils "github.com/open-telemetry/otel-arrow/pkg/arrow" - otlp "github.com/open-telemetry/otel-arrow/pkg/otel/common/otlp" + "github.com/open-telemetry/otel-arrow/pkg/otel/common/otlp" "github.com/open-telemetry/otel-arrow/pkg/otel/constants" "github.com/open-telemetry/otel-arrow/pkg/werror" ) @@ -107,7 +107,7 @@ func SchemaToNDPIDs(schema *arrow.Schema) (*NumberDataPointIDs, error) { // // Important Note: This function doesn't take ownership of the record. The // caller is responsible for releasing it. -func NumberDataPointsStoreFrom(record arrow.Record, exemplarsStore *ExemplarsStore, attrsStore *otlp.Attributes32Store) (*NumberDataPointsStore, error) { +func NumberDataPointsStoreFrom(record arrow.Record, exemplarsStore *ExemplarsStore, attrsStore *otlp.AttributesStore[uint32]) (*NumberDataPointsStore, error) { store := &NumberDataPointsStore{ dataPointsByID: make(map[uint16]pmetric.NumberDataPointSlice), } diff --git a/pkg/otel/metrics/otlp/related_data.go b/pkg/otel/metrics/otlp/related_data.go index 2958351d..1c85d8fc 100644 --- a/pkg/otel/metrics/otlp/related_data.go +++ b/pkg/otel/metrics/otlp/related_data.go @@ -30,15 +30,15 @@ type ( MetricID uint16 // Attributes stores - ResAttrMapStore *otlp.Attributes16Store - ScopeAttrMapStore *otlp.Attributes16Store - NumberDPAttrsStore *otlp.Attributes32Store - SummaryAttrsStore *otlp.Attributes32Store - HistogramAttrsStore *otlp.Attributes32Store - ExpHistogramAttrsStore *otlp.Attributes32Store - NumberDPExemplarAttrsStore *otlp.Attributes32Store - HistogramExemplarAttrsStore *otlp.Attributes32Store - ExpHistogramExemplarAttrsStore *otlp.Attributes32Store + ResAttrMapStore *otlp.AttributesStore[uint16] + ScopeAttrMapStore *otlp.AttributesStore[uint16] + NumberDPAttrsStore *otlp.AttributesStore[uint32] + SummaryAttrsStore *otlp.AttributesStore[uint32] + HistogramAttrsStore *otlp.AttributesStore[uint32] + ExpHistogramAttrsStore *otlp.AttributesStore[uint32] + NumberDPExemplarAttrsStore *otlp.AttributesStore[uint32] + HistogramExemplarAttrsStore *otlp.AttributesStore[uint32] + ExpHistogramExemplarAttrsStore *otlp.AttributesStore[uint32] // Metric stores NumberDataPointsStore *NumberDataPointsStore @@ -55,15 +55,15 @@ type ( func NewRelatedData() *RelatedData { return &RelatedData{ - ResAttrMapStore: otlp.NewAttributes16Store(), - ScopeAttrMapStore: otlp.NewAttributes16Store(), - NumberDPAttrsStore: otlp.NewAttributes32Store(), - SummaryAttrsStore: otlp.NewAttributes32Store(), - HistogramAttrsStore: otlp.NewAttributes32Store(), - ExpHistogramAttrsStore: otlp.NewAttributes32Store(), - NumberDPExemplarAttrsStore: otlp.NewAttributes32Store(), - HistogramExemplarAttrsStore: otlp.NewAttributes32Store(), - ExpHistogramExemplarAttrsStore: otlp.NewAttributes32Store(), + ResAttrMapStore: otlp.NewAttributesStore[uint16](), + ScopeAttrMapStore: otlp.NewAttributesStore[uint16](), + NumberDPAttrsStore: otlp.NewAttributesStore[uint32](), + SummaryAttrsStore: otlp.NewAttributesStore[uint32](), + HistogramAttrsStore: otlp.NewAttributesStore[uint32](), + ExpHistogramAttrsStore: otlp.NewAttributesStore[uint32](), + NumberDPExemplarAttrsStore: otlp.NewAttributesStore[uint32](), + HistogramExemplarAttrsStore: otlp.NewAttributesStore[uint32](), + ExpHistogramExemplarAttrsStore: otlp.NewAttributesStore[uint32](), NumberDataPointsStore: NewNumberDataPointsStore(), SummaryDataPointsStore: NewSummaryDataPointsStore(), @@ -101,32 +101,32 @@ func RelatedDataFrom(records []*record_message.RecordMessage) (relatedData *Rela for _, record := range records { switch record.PayloadType() { case colarspb.ArrowPayloadType_RESOURCE_ATTRS: - err = otlp.Attributes16StoreFrom(record.Record(), relatedData.ResAttrMapStore) + err = otlp.AttributesStoreFrom(record.Record(), relatedData.ResAttrMapStore) if err != nil { return nil, nil, werror.Wrap(err) } case colarspb.ArrowPayloadType_SCOPE_ATTRS: - err = otlp.Attributes16StoreFrom(record.Record(), relatedData.ScopeAttrMapStore) + err = otlp.AttributesStoreFrom(record.Record(), relatedData.ScopeAttrMapStore) if err != nil { return nil, nil, werror.Wrap(err) } case colarspb.ArrowPayloadType_NUMBER_DP_ATTRS: - err = otlp.Attributes32StoreFrom(record.Record(), relatedData.NumberDPAttrsStore) + err = otlp.AttributesStoreFrom(record.Record(), relatedData.NumberDPAttrsStore) if err != nil { return nil, nil, werror.Wrap(err) } case colarspb.ArrowPayloadType_SUMMARY_DP_ATTRS: - err = otlp.Attributes32StoreFrom(record.Record(), relatedData.SummaryAttrsStore) + err = otlp.AttributesStoreFrom(record.Record(), relatedData.SummaryAttrsStore) if err != nil { return nil, nil, werror.Wrap(err) } case colarspb.ArrowPayloadType_HISTOGRAM_DP_ATTRS: - err = otlp.Attributes32StoreFrom(record.Record(), relatedData.HistogramAttrsStore) + err = otlp.AttributesStoreFrom(record.Record(), relatedData.HistogramAttrsStore) if err != nil { return nil, nil, werror.Wrap(err) } case colarspb.ArrowPayloadType_EXP_HISTOGRAM_DP_ATTRS: - err = otlp.Attributes32StoreFrom(record.Record(), relatedData.ExpHistogramAttrsStore) + err = otlp.AttributesStoreFrom(record.Record(), relatedData.ExpHistogramAttrsStore) if err != nil { return nil, nil, werror.Wrap(err) } @@ -171,17 +171,17 @@ func RelatedDataFrom(records []*record_message.RecordMessage) (relatedData *Rela } expHistogramDBExRec = record case colarspb.ArrowPayloadType_NUMBER_DP_EXEMPLAR_ATTRS: - err = otlp.Attributes32StoreFrom(record.Record(), relatedData.NumberDPExemplarAttrsStore) + err = otlp.AttributesStoreFrom[uint32](record.Record(), relatedData.NumberDPExemplarAttrsStore) if err != nil { return nil, nil, werror.Wrap(err) } case colarspb.ArrowPayloadType_HISTOGRAM_DP_EXEMPLAR_ATTRS: - err = otlp.Attributes32StoreFrom(record.Record(), relatedData.HistogramExemplarAttrsStore) + err = otlp.AttributesStoreFrom[uint32](record.Record(), relatedData.HistogramExemplarAttrsStore) if err != nil { return nil, nil, werror.Wrap(err) } case colarspb.ArrowPayloadType_EXP_HISTOGRAM_DP_EXEMPLAR_ATTRS: - err = otlp.Attributes32StoreFrom(record.Record(), relatedData.ExpHistogramExemplarAttrsStore) + err = otlp.AttributesStoreFrom[uint32](record.Record(), relatedData.ExpHistogramExemplarAttrsStore) if err != nil { return nil, nil, werror.Wrap(err) } diff --git a/pkg/otel/metrics/otlp/summary.go b/pkg/otel/metrics/otlp/summary.go index 21320402..8bd14146 100644 --- a/pkg/otel/metrics/otlp/summary.go +++ b/pkg/otel/metrics/otlp/summary.go @@ -114,7 +114,7 @@ func SchemaToSummaryIDs(schema *arrow.Schema) (*SummaryDataPointIDs, error) { // // Important Note: This function doesn't take ownership of the record. The // caller is responsible for releasing it. -func SummaryDataPointsStoreFrom(record arrow.Record, attrsStore *otlp.Attributes32Store) (*SummaryDataPointsStore, error) { +func SummaryDataPointsStoreFrom(record arrow.Record, attrsStore *otlp.AttributesStore[uint32]) (*SummaryDataPointsStore, error) { store := &SummaryDataPointsStore{ dataPointsByID: make(map[uint16]pmetric.SummaryDataPointSlice), } diff --git a/pkg/otel/traces/otlp/event.go b/pkg/otel/traces/otlp/event.go index 673989f3..ea56d940 100644 --- a/pkg/otel/traces/otlp/event.go +++ b/pkg/otel/traces/otlp/event.go @@ -76,7 +76,7 @@ func (s *SpanEventsStore) EventsByID(ID uint16) []*ptrace.SpanEvent { // caller is responsible for releasing it. func SpanEventsStoreFrom( record arrow.Record, - attrsStore *otlp.Attributes32Store, + attrsStore *otlp.AttributesStore[uint32], conf *tarrow.EventConfig, ) (*SpanEventsStore, error) { store := &SpanEventsStore{ diff --git a/pkg/otel/traces/otlp/link.go b/pkg/otel/traces/otlp/link.go index d656b194..d273a8b6 100644 --- a/pkg/otel/traces/otlp/link.go +++ b/pkg/otel/traces/otlp/link.go @@ -23,7 +23,7 @@ import ( arrowutils "github.com/open-telemetry/otel-arrow/pkg/arrow" carrow "github.com/open-telemetry/otel-arrow/pkg/otel/common/arrow" - otlp "github.com/open-telemetry/otel-arrow/pkg/otel/common/otlp" + "github.com/open-telemetry/otel-arrow/pkg/otel/common/otlp" "github.com/open-telemetry/otel-arrow/pkg/otel/constants" tarrow "github.com/open-telemetry/otel-arrow/pkg/otel/traces/arrow" "github.com/open-telemetry/otel-arrow/pkg/werror" @@ -77,7 +77,7 @@ func (s *SpanLinksStore) LinksByID(ID uint16) []*ptrace.SpanLink { // caller is responsible for releasing it. func SpanLinksStoreFrom( record arrow.Record, - attrsStore *otlp.Attributes32Store, + attrsStore *otlp.AttributesStore[uint32], conf *tarrow.LinkConfig, ) (*SpanLinksStore, error) { store := &SpanLinksStore{ diff --git a/pkg/otel/traces/otlp/related_data.go b/pkg/otel/traces/otlp/related_data.go index 3e9a50dd..5eefa103 100644 --- a/pkg/otel/traces/otlp/related_data.go +++ b/pkg/otel/traces/otlp/related_data.go @@ -31,11 +31,11 @@ import ( type ( RelatedData struct { SpanID uint16 - ResAttrMapStore *otlp.Attributes16Store - ScopeAttrMapStore *otlp.Attributes16Store - SpanAttrMapStore *otlp.Attributes16Store - SpanEventAttrMapStore *otlp.Attributes32Store - SpanLinkAttrMapStore *otlp.Attributes32Store + ResAttrMapStore *otlp.AttributesStore[uint16] + ScopeAttrMapStore *otlp.AttributesStore[uint16] + SpanAttrMapStore *otlp.AttributesStore[uint16] + SpanEventAttrMapStore *otlp.AttributesStore[uint32] + SpanLinkAttrMapStore *otlp.AttributesStore[uint32] SpanEventsStore *SpanEventsStore SpanLinksStore *SpanLinksStore } @@ -43,11 +43,11 @@ type ( func NewRelatedData(conf *arrow.Config) *RelatedData { return &RelatedData{ - ResAttrMapStore: otlp.NewAttributes16Store(), - ScopeAttrMapStore: otlp.NewAttributes16Store(), - SpanAttrMapStore: otlp.NewAttributes16Store(), - SpanEventAttrMapStore: otlp.NewAttributes32Store(), - SpanLinkAttrMapStore: otlp.NewAttributes32Store(), + ResAttrMapStore: otlp.NewAttributesStore[uint16](), + ScopeAttrMapStore: otlp.NewAttributesStore[uint16](), + SpanAttrMapStore: otlp.NewAttributesStore[uint16](), + SpanEventAttrMapStore: otlp.NewAttributesStore[uint32](), + SpanLinkAttrMapStore: otlp.NewAttributesStore[uint32](), SpanEventsStore: NewSpanEventsStore(conf.Event), SpanLinksStore: NewSpanLinksStore(), } @@ -75,7 +75,7 @@ func RelatedDataFrom(records []*record_message.RecordMessage, conf *arrow.Config for _, record := range records { switch record.PayloadType() { case colarspb.ArrowPayloadType_RESOURCE_ATTRS: - err = otlp.Attributes16StoreFrom( + err = otlp.AttributesStoreFrom[uint16]( record.Record(), relatedData.ResAttrMapStore, ) @@ -83,12 +83,12 @@ func RelatedDataFrom(records []*record_message.RecordMessage, conf *arrow.Config return nil, nil, werror.Wrap(err) } case colarspb.ArrowPayloadType_SCOPE_ATTRS: - err = otlp.Attributes16StoreFrom(record.Record(), relatedData.ScopeAttrMapStore) + err = otlp.AttributesStoreFrom[uint16](record.Record(), relatedData.ScopeAttrMapStore) if err != nil { return nil, nil, werror.Wrap(err) } case colarspb.ArrowPayloadType_SPAN_ATTRS: - err = otlp.Attributes16StoreFrom(record.Record(), relatedData.SpanAttrMapStore) + err = otlp.AttributesStoreFrom[uint16](record.Record(), relatedData.SpanAttrMapStore) if err != nil { return nil, nil, werror.Wrap(err) } @@ -98,7 +98,7 @@ func RelatedDataFrom(records []*record_message.RecordMessage, conf *arrow.Config } spanEventRecord = record case colarspb.ArrowPayloadType_SPAN_EVENT_ATTRS: - err = otlp.Attributes32StoreFrom(record.Record(), relatedData.SpanEventAttrMapStore) + err = otlp.AttributesStoreFrom[uint32](record.Record(), relatedData.SpanEventAttrMapStore) if err != nil { return nil, nil, werror.Wrap(err) } @@ -108,7 +108,7 @@ func RelatedDataFrom(records []*record_message.RecordMessage, conf *arrow.Config } spanLinkRecord = record case colarspb.ArrowPayloadType_SPAN_LINK_ATTRS: - err = otlp.Attributes32StoreFrom(record.Record(), relatedData.SpanLinkAttrMapStore) + err = otlp.AttributesStoreFrom[uint32](record.Record(), relatedData.SpanLinkAttrMapStore) if err != nil { return nil, nil, werror.Wrap(err) }