Skip to content

Commit

Permalink
Measure time spent on encoding and the compaction ratio
Browse files Browse the repository at this point in the history
Add metrics to measure time spent on encoding CBOR+ZSTD, and the
compaction ratio achieved by ZSTD.

Fixes #863
  • Loading branch information
masih committed Feb 4, 2025
1 parent 30fac2e commit 72b5e0d
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 10 deletions.
83 changes: 73 additions & 10 deletions internal/encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package encoding

import (
"bytes"
"context"
"fmt"
"reflect"
"sync"
"time"

"github.com/klauspost/compress/zstd"
cbg "github.com/whyrusleeping/cbor-gen"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

// maxDecompressedSize is the default maximum amount of memory allocated by the
Expand Down Expand Up @@ -37,15 +42,29 @@ func NewCBOR[T CBORMarshalUnmarshaler]() *CBOR[T] {
return &CBOR[T]{}
}

func (c *CBOR[T]) Encode(m T) ([]byte, error) {
func (c *CBOR[T]) Encode(m T) (_ []byte, _err error) {
defer func(start time.Time) {
if _err != nil {
metrics.encodingTime.Record(context.Background(),
time.Since(start).Seconds(),
metric.WithAttributeSet(attrSetCborEncode))
}

Check warning on line 51 in internal/encoding/encoding.go

View check run for this annotation

Codecov / codecov/patch

internal/encoding/encoding.go#L48-L51

Added lines #L48 - L51 were not covered by tests
}(time.Now())
var out bytes.Buffer
if err := m.MarshalCBOR(&out); err != nil {
return nil, err
}
return out.Bytes(), nil
}

func (c *CBOR[T]) Decode(v []byte, t T) error {
func (c *CBOR[T]) Decode(v []byte, t T) (_err error) {
defer func(start time.Time) {
if _err != nil {
metrics.encodingTime.Record(context.Background(),
time.Since(start).Seconds(),
metric.WithAttributeSet(attrSetCborDecode))
}

Check warning on line 66 in internal/encoding/encoding.go

View check run for this annotation

Codecov / codecov/patch

internal/encoding/encoding.go#L63-L66

Added lines #L63 - L66 were not covered by tests
}(time.Now())
r := bytes.NewReader(v)
return t.UnmarshalCBOR(r)
}
Expand All @@ -54,6 +73,9 @@ type ZSTD[T CBORMarshalUnmarshaler] struct {
cborEncoding *CBOR[T]
compressor *zstd.Encoder
decompressor *zstd.Decoder

metricAttr attribute.KeyValue
metricAttrLoader sync.Once
}

func NewZSTD[T CBORMarshalUnmarshaler]() (*ZSTD[T], error) {
Expand All @@ -74,26 +96,67 @@ func NewZSTD[T CBORMarshalUnmarshaler]() (*ZSTD[T], error) {
}, nil
}

func (c *ZSTD[T]) Encode(m T) ([]byte, error) {
cborEncoded, err := c.cborEncoding.Encode(m)
if len(cborEncoded) > maxDecompressedSize {
func (c *ZSTD[T]) Encode(t T) (_ []byte, _err error) {
decompressed, err := c.cborEncoding.Encode(t)
if len(decompressed) > maxDecompressedSize {
// Error out early if the encoded value is too large to be decompressed.
return nil, fmt.Errorf("encoded value cannot exceed maximum size: %d > %d", len(cborEncoded), maxDecompressedSize)
return nil, fmt.Errorf("encoded value cannot exceed maximum size: %d > %d", len(decompressed), maxDecompressedSize)

Check warning on line 103 in internal/encoding/encoding.go

View check run for this annotation

Codecov / codecov/patch

internal/encoding/encoding.go#L103

Added line #L103 was not covered by tests
}
if err != nil {
return nil, err
}
compressed := c.compressor.EncodeAll(cborEncoded, make([]byte, 0, len(cborEncoded)))

compressed := c.compress(decompressed)
c.meterCompactionRatio(t, len(decompressed), len(compressed))
return compressed, nil
}

func (c *ZSTD[T]) Decode(v []byte, t T) error {
func (c *ZSTD[T]) Decode(compressed []byte, t T) error {
buf := bufferPool.Get().(*[]byte)
defer bufferPool.Put(buf)

cborEncoded, err := c.decompressor.DecodeAll(v, (*buf)[:0])
decompressed, err := c.decompressInto(compressed, (*buf)[:0])
if err != nil {
return err
}
return c.cborEncoding.Decode(cborEncoded, t)
c.meterCompactionRatio(t, len(decompressed), len(compressed))
return c.cborEncoding.Decode(decompressed, t)
}

func (c *ZSTD[T]) compress(decompressed []byte) []byte {
defer func(start time.Time) {
metrics.encodingTime.Record(context.Background(),
time.Since(start).Seconds(),
metric.WithAttributeSet(attrSetZstdEncode))
}(time.Now())
return c.compressor.EncodeAll(decompressed, make([]byte, 0, len(decompressed)))
}

func (c *ZSTD[T]) decompressInto(compressed []byte, buf []byte) (_ []byte, _err error) {
defer func(start time.Time) {
if _err != nil {
metrics.encodingTime.Record(context.Background(),
time.Since(start).Seconds(),
metric.WithAttributeSet(attrSetZstdDecode))
}
}(time.Now())
return c.decompressor.DecodeAll(compressed, buf)
}

func (c *ZSTD[T]) meterCompactionRatio(target T, decompressedSize, compressedSize int) {
compactionRatio := float64(decompressedSize) / float64(compressedSize)
metrics.zstdCompactionRatio.Record(context.Background(), compactionRatio, metric.WithAttributes(c.getMetricAttribute(target)))
}

func (c *ZSTD[T]) getMetricAttribute(t T) attribute.KeyValue {
c.metricAttrLoader.Do(func() {
const key = "type"
switch target := reflect.TypeOf(t); {
case target.Kind() == reflect.Ptr:
c.metricAttr = attribute.String(key, target.Elem().Name())
default:
c.metricAttr = attribute.String(key, target.Name())
}
})
return c.metricAttr
}
7 changes: 7 additions & 0 deletions internal/encoding/encoding_api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package encoding

import "go.opentelemetry.io/otel/attribute"

// GetMetricAttribute returns the attribute for metric collection, exported for
// testing purposes.
func (c *ZSTD[T]) GetMetricAttribute(t T) attribute.KeyValue { return c.getMetricAttribute(t) }
18 changes: 18 additions & 0 deletions internal/encoding/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/klauspost/compress/zstd"
"github.com/stretchr/testify/require"
cbg "github.com/whyrusleeping/cbor-gen"
"go.opentelemetry.io/otel/attribute"
)

var (
Expand Down Expand Up @@ -77,3 +78,20 @@ func TestZSTDLimits(t *testing.T) {
var dest testValue
require.ErrorContains(t, subject.Decode(tooLargeACompression, &dest), "decompressed size exceeds configured limit")
}

func TestZSTD_GetMetricAttribute(t *testing.T) {
t.Run("By Pointer", func(t *testing.T) {
subject, err := encoding.NewZSTD[*testValue]()
require.NoError(t, err)
require.Equal(t, attribute.String("type", "testValue"), subject.GetMetricAttribute(&testValue{}))
})
t.Run("By Value", func(t *testing.T) {
type anotherTestValue struct {
cbg.CBORUnmarshaler
cbg.CBORMarshaler
}
subject, err := encoding.NewZSTD[anotherTestValue]()
require.NoError(t, err)
require.Equal(t, attribute.String("type", "anotherTestValue"), subject.GetMetricAttribute(anotherTestValue{}))
})
}
38 changes: 38 additions & 0 deletions internal/encoding/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package encoding

import (
"github.com/filecoin-project/go-f3/internal/measurements"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

var (
attrCodecCbor = attribute.String("codec", "cbor")
attrCodecZstd = attribute.String("codec", "zstd")
attrActionEncode = attribute.String("action", "encode")
attrActionDecode = attribute.String("action", "decode")
attrSetCborEncode = attribute.NewSet(attrCodecCbor, attrActionEncode)
attrSetCborDecode = attribute.NewSet(attrCodecCbor, attrActionDecode)
attrSetZstdEncode = attribute.NewSet(attrCodecZstd, attrActionEncode)
attrSetZstdDecode = attribute.NewSet(attrCodecZstd, attrActionDecode)

meter = otel.Meter("f3/internal/encoding")

metrics = struct {
encodingTime metric.Float64Histogram
zstdCompactionRatio metric.Float64Histogram
}{
encodingTime: measurements.Must(meter.Float64Histogram(
"f3_internal_encoding_time",
metric.WithDescription("The time spent on encoding/decoding in seconds."),
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(0.001, 0.003, 0.005, 0.01, 0.03, 0.05, 0.1, 0.3, 0.5, 1.0, 2.0, 5.0, 10.0),
)),
zstdCompactionRatio: measurements.Must(meter.Float64Histogram(
"f3_internal_encoding_zstd_compaction_ratio",
metric.WithDescription("The ratio of compressed to uncompressed data size for zstd encoding."),
metric.WithExplicitBucketBoundaries(0.0, 0.1, 0.2, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0),
)),
}
)

0 comments on commit 72b5e0d

Please sign in to comment.