Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg: Create generic AttributesStore #276

Merged
merged 1 commit into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions pkg/arrow/from_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,69 @@ package arrow
// Utility functions to extract values from Arrow Records.

import (
"fmt"

"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/array"
"golang.org/x/exp/constraints"

"github.com/open-telemetry/otel-arrow/pkg/otel/common"
"github.com/open-telemetry/otel-arrow/pkg/werror"
)

// UnsignedFromRecord returns the unsigned value for a specific row and column in an
// Arrow record. If the value is null, it returns 0.
func UnsignedFromRecord[unsigned constraints.Unsigned](record arrow.Record, fieldID int, row int) (unsigned, error) {
if fieldID == AbsentFieldID {
return 0, nil
}

arr := record.Column(fieldID)
if arr == nil {
return 0, nil
}

switch arr := arr.(type) {
case *array.Dictionary:
switch dict := arr.Dictionary().(type) {
case *array.Uint32:
if arr.IsNull(row) {
return 0, nil
} else {
return unsigned(dict.Value(arr.GetValueIndex(row))), nil
}
default:
return 0, werror.WrapWithMsg(ErrInvalidArrayType, fmt.Sprintf("dictionary of type %T is unsupported", dict))
}
case *array.Uint8:
if arr.IsNull(row) {
return 0, nil
} else {
return unsigned(arr.Value(row)), nil
}
case *array.Uint16:
if arr.IsNull(row) {
return 0, nil
} else {
return unsigned(arr.Value(row)), nil
}
case *array.Uint32:
if arr.IsNull(row) {
return 0, nil
} else {
return unsigned(arr.Value(row)), nil
}
case *array.Uint64:
if arr.IsNull(row) {
return 0, nil
} else {
return unsigned(arr.Value(row)), nil
}
default:
return 0, werror.WrapWithMsg(ErrInvalidArrayType, fmt.Sprintf("array of type %T is unsupported", arr))
}
}

// U8FromRecord returns the uint8 value for a specific row and column in an
// Arrow record. If the value is null, it returns 0.
func U8FromRecord(record arrow.Record, fieldID int, row int) (uint8, error) {
Expand Down
209 changes: 22 additions & 187 deletions pkg/otel/common/otlp/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package otlp
import (
"github.com/apache/arrow/go/v17/arrow"
"go.opentelemetry.io/collector/pdata/pcommon"
"golang.org/x/exp/constraints"

arrowutils "github.com/open-telemetry/otel-arrow/pkg/arrow"
"github.com/open-telemetry/otel-arrow/pkg/otel/common"
Expand All @@ -45,57 +46,33 @@ type (
Ser int
}

// Attributes16Store is a store for attributes.
// AttributesStore is a store for attributes.
// The attributes are stored in a map by ID. This ID represents the
// identifier of the main entity (span, event, link, etc.) to which the
// attributes are attached. So the maximum number of attributes per entity
// is not limited.
Attributes16Store struct {
lastID uint16
attributesByID map[uint16]*pcommon.Map
AttributesStore[u constraints.Unsigned] struct {
lastID u
attributesByID map[u]*pcommon.Map
}

// Attributes32Store is a store for attributes.
// The attributes are stored in a map by ID. This ID represents the
// identifier of the main entity (span, event, link, etc.) to which the
// attributes are attached. So the maximum number of attributes per entity
// is not limited.
Attributes32Store struct {
lastID uint32
attributesByID map[uint32]*pcommon.Map
}

Attrs16ParentIdDecoder struct {
prevParentID uint16
prevKey string
prevValue *pcommon.Value
encodingType int
}

Attrs32ParentIdDecoder struct {
prevParentID uint32
AttrsParentIDDecoder[u constraints.Unsigned] struct {
prevParentID u
prevKey string
prevValue *pcommon.Value
encodingType int
}
)

// NewAttributes16Store creates a new Attributes16Store.
func NewAttributes16Store() *Attributes16Store {
return &Attributes16Store{
attributesByID: make(map[uint16]*pcommon.Map),
}
}

// NewAttributes32Store creates a new Attributes32Store.
func NewAttributes32Store() *Attributes32Store {
return &Attributes32Store{
attributesByID: make(map[uint32]*pcommon.Map),
// NewAttributesStore creates a new AttributesStore.
func NewAttributesStore[u constraints.Unsigned]() *AttributesStore[u] {
return &AttributesStore[u]{
attributesByID: make(map[u]*pcommon.Map),
}
}

// AttributesByDeltaID returns the attributes for the given Delta ID.
func (s *Attributes16Store) AttributesByDeltaID(ID uint16) *pcommon.Map {
func (s *AttributesStore[u]) AttributesByDeltaID(ID u) *pcommon.Map {
s.lastID += ID
if m, ok := s.attributesByID[s.lastID]; ok {
return m
Expand All @@ -104,42 +81,25 @@ func (s *Attributes16Store) AttributesByDeltaID(ID uint16) *pcommon.Map {
}

// AttributesByID returns the attributes for the given ID.
func (s *Attributes16Store) AttributesByID(ID uint16) *pcommon.Map {
func (s *AttributesStore[u]) AttributesByID(ID u) *pcommon.Map {
if m, ok := s.attributesByID[ID]; ok {
return m
}
return nil
}

// AttributesByID returns the attributes for the given ID.
func (s *Attributes32Store) AttributesByID(ID uint32) *pcommon.Map {
if m, ok := s.attributesByID[ID]; ok {
return m
}
return nil
}

// AttributesByDeltaID returns the attributes for the given Delta ID.
func (s *Attributes32Store) AttributesByDeltaID(ID uint32) *pcommon.Map {
s.lastID += ID
if m, ok := s.attributesByID[s.lastID]; ok {
return m
}
return nil
}

// Attributes16StoreFrom creates an Attribute16Store from an arrow.Record.
// AttributesStoreFrom creates an Attribute16Store from an arrow.Record.
// Note: This function doesn't release the record passed as argument. This is
// the responsibility of the caller
func Attributes16StoreFrom(record arrow.Record, store *Attributes16Store) error {
func AttributesStoreFrom[unsigned constraints.Unsigned](record arrow.Record, store *AttributesStore[unsigned]) error {
attrIDS, err := SchemaToAttributeIDs(record.Schema())
if err != nil {
return werror.Wrap(err)
}

attrsCount := int(record.NumRows())

parentIdDecoder := NewAttrs16ParentIdDecoder()
parentIDDecoder := NewAttrsParentIDDecoder[unsigned]()

// Read all key/value tuples from the record and reconstruct the attributes
// map by ID.
Expand All @@ -153,7 +113,6 @@ func Attributes16StoreFrom(record arrow.Record, store *Attributes16Store) error
if err != nil {
return werror.Wrap(err)
}

value := pcommon.NewValueEmpty()
switch pcommon.ValueType(vType) {
case pcommon.ValueTypeStr:
Expand Down Expand Up @@ -206,106 +165,12 @@ func Attributes16StoreFrom(record arrow.Record, store *Attributes16Store) error
// silently ignore unknown types to avoid DOS attacks
}

deltaOrParentID, err := arrowutils.U16FromRecord(record, attrIDS.ParentID, i)
deltaOrParentID, err := arrowutils.UnsignedFromRecord[unsigned](record, attrIDS.ParentID, i)
if err != nil {
return werror.Wrap(err)
}
parentID := parentIdDecoder.Decode(deltaOrParentID, key, &value)

m, ok := store.attributesByID[parentID]
if !ok {
newMap := pcommon.NewMap()
m = &newMap
store.attributesByID[parentID] = m
}
value.CopyTo(m.PutEmpty(key))
}

return nil
}

// Attributes32StoreFrom creates an Attributes32Store from an arrow.Record.
// Note: This function doesn't release the record passed as argument. This is
// the responsibility of the caller
func Attributes32StoreFrom(record arrow.Record, store *Attributes32Store) error {
attrIDS, err := SchemaToAttributeIDs(record.Schema())
if err != nil {
return werror.Wrap(err)
}

attrsCount := int(record.NumRows())

parentIdDecoder := NewAttrs32ParentIdDecoder()

// Read all key/value tuples from the record and reconstruct the attributes
// map by ID.
for i := 0; i < attrsCount; i++ {
key, err := arrowutils.StringFromRecord(record, attrIDS.Key, i)
if err != nil {
return werror.Wrap(err)
}

vType, err := arrowutils.U8FromRecord(record, attrIDS.Type, i)
if err != nil {
return werror.Wrap(err)
}
value := pcommon.NewValueEmpty()
switch pcommon.ValueType(vType) {
case pcommon.ValueTypeStr:
v, err := arrowutils.StringFromRecord(record, attrIDS.Str, i)
if err != nil {
return werror.Wrap(err)
}
value.SetStr(v)
case pcommon.ValueTypeInt:
v, err := arrowutils.I64FromRecord(record, attrIDS.Int, i)
if err != nil {
return werror.Wrap(err)
}
value.SetInt(v)
case pcommon.ValueTypeDouble:
v, err := arrowutils.F64FromRecord(record, attrIDS.Double, i)
if err != nil {
return werror.Wrap(err)
}
value.SetDouble(v)
case pcommon.ValueTypeBool:
v, err := arrowutils.BoolFromRecord(record, attrIDS.Bool, i)
if err != nil {
return werror.Wrap(err)
}
value.SetBool(v)
case pcommon.ValueTypeBytes:
v, err := arrowutils.BinaryFromRecord(record, attrIDS.Bytes, i)
if err != nil {
return werror.Wrap(err)
}
value.SetEmptyBytes().FromRaw(v)
case pcommon.ValueTypeSlice:
v, err := arrowutils.BinaryFromRecord(record, attrIDS.Ser, i)
if err != nil {
return werror.Wrap(err)
}
if err = common.Deserialize(v, value); err != nil {
return werror.Wrap(err)
}
case pcommon.ValueTypeMap:
v, err := arrowutils.BinaryFromRecord(record, attrIDS.Ser, i)
if err != nil {
return werror.Wrap(err)
}
if err = common.Deserialize(v, value); err != nil {
return werror.Wrap(err)
}
default:
// silently ignore unknown types to avoid DOS attacks
}

deltaOrParentID, err := arrowutils.U32FromRecord(record, attrIDS.ParentID, i)
if err != nil {
return werror.Wrap(err)
}
parentID := parentIdDecoder.Decode(deltaOrParentID, key, &value)
parentID := parentIDDecoder.Decode(deltaOrParentID, key, &value)

m, ok := store.attributesByID[parentID]
if !ok {
Expand Down Expand Up @@ -380,43 +245,13 @@ func SchemaToAttributeIDs(schema *arrow.Schema) (*AttributeIDs, error) {
}, nil
}

func NewAttrs16ParentIdDecoder() *Attrs16ParentIdDecoder {
return &Attrs16ParentIdDecoder{
encodingType: carrow.ParentIdDeltaGroupEncoding,
}
}

func (d *Attrs16ParentIdDecoder) Decode(deltaOrParentID uint16, key string, value *pcommon.Value) uint16 {
switch d.encodingType {
case carrow.ParentIdNoEncoding:
return deltaOrParentID
case carrow.ParentIdDeltaEncoding:
decodedParentID := d.prevParentID + deltaOrParentID
d.prevParentID = decodedParentID
return decodedParentID
case carrow.ParentIdDeltaGroupEncoding:
if d.prevKey == key && carrow.Equal(d.prevValue, value) {
parentID := d.prevParentID + deltaOrParentID
d.prevParentID = parentID
return parentID
} else {
d.prevKey = key
d.prevValue = value
d.prevParentID = deltaOrParentID
return deltaOrParentID
}
default:
panic("unknown attrs16 parent ID encoding type")
}
}

func NewAttrs32ParentIdDecoder() *Attrs32ParentIdDecoder {
return &Attrs32ParentIdDecoder{
func NewAttrsParentIDDecoder[u constraints.Unsigned]() *AttrsParentIDDecoder[u] {
return &AttrsParentIDDecoder[u]{
encodingType: carrow.ParentIdDeltaGroupEncoding,
}
}

func (d *Attrs32ParentIdDecoder) Decode(deltaOrParentID uint32, key string, value *pcommon.Value) uint32 {
func (d *AttrsParentIDDecoder[unsigned]) Decode(deltaOrParentID unsigned, key string, value *pcommon.Value) unsigned {
switch d.encodingType {
case carrow.ParentIdNoEncoding:
return deltaOrParentID
Expand All @@ -436,6 +271,6 @@ func (d *Attrs32ParentIdDecoder) Decode(deltaOrParentID uint32, key string, valu
return deltaOrParentID
}
default:
panic("unknown attrs32 parent ID encoding type")
panic("unknown parent ID encoding type")
}
}
2 changes: 1 addition & 1 deletion pkg/otel/common/otlp/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewResourceIdsFromSchema(schema *arrow.Schema) (*ResourceIds, error) {
}, nil
}

func UpdateResourceFromRecord(r pcommon.Resource, record arrow.Record, row int, resIds *ResourceIds, attrsStore *Attributes16Store) (schemaUrl string, err error) {
func UpdateResourceFromRecord(r pcommon.Resource, record arrow.Record, row int, resIds *ResourceIds, attrsStore *AttributesStore[uint16]) (schemaUrl string, err error) {
resArr, err := arrowutils.StructFromRecord(record, resIds.Resource, row)
if err != nil {
return "", werror.WrapWithContext(err, map[string]interface{}{"row": row})
Expand Down
2 changes: 1 addition & 1 deletion pkg/otel/common/otlp/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func UpdateScopeFromRecord(
record arrow.Record,
row int,
ids *ScopeIds,
attrsStore *Attributes16Store,
attrsStore *AttributesStore[uint16],
) error {
scopeArray, err := arrowutils.StructFromRecord(record, ids.Scope, row)
if err != nil {
Expand Down
Loading