diff --git a/internal/encoding/encoding.go b/internal/encoding/encoding.go index 65e03bb2..f50c321b 100644 --- a/internal/encoding/encoding.go +++ b/internal/encoding/encoding.go @@ -2,11 +2,16 @@ package encoding import ( "bytes" + "context" "fmt" + "reflect" "sync" + "time" "github.com/klauspost/compress/zstd" cbg "github.com/whyrusleeping/cbor-gen" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" ) // maxDecompressedSize is the default maximum amount of memory allocated by the @@ -37,7 +42,14 @@ func NewCBOR[T CBORMarshalUnmarshaler]() *CBOR[T] { return &CBOR[T]{} } -func (c *CBOR[T]) Encode(m T) ([]byte, error) { +func (c *CBOR[T]) Encode(m T) (_ []byte, _err error) { + defer func(start time.Time) { + if _err != nil { + metrics.encodingTime.Record(context.Background(), + time.Since(start).Seconds(), + metric.WithAttributeSet(attrSetCborEncode)) + } + }(time.Now()) var out bytes.Buffer if err := m.MarshalCBOR(&out); err != nil { return nil, err @@ -45,7 +57,14 @@ func (c *CBOR[T]) Encode(m T) ([]byte, error) { return out.Bytes(), nil } -func (c *CBOR[T]) Decode(v []byte, t T) error { +func (c *CBOR[T]) Decode(v []byte, t T) (_err error) { + defer func(start time.Time) { + if _err != nil { + metrics.encodingTime.Record(context.Background(), + time.Since(start).Seconds(), + metric.WithAttributeSet(attrSetCborDecode)) + } + }(time.Now()) r := bytes.NewReader(v) return t.UnmarshalCBOR(r) } @@ -54,6 +73,9 @@ type ZSTD[T CBORMarshalUnmarshaler] struct { cborEncoding *CBOR[T] compressor *zstd.Encoder decompressor *zstd.Decoder + + metricAttr attribute.KeyValue + metricAttrLoader sync.Once } func NewZSTD[T CBORMarshalUnmarshaler]() (*ZSTD[T], error) { @@ -74,26 +96,67 @@ func NewZSTD[T CBORMarshalUnmarshaler]() (*ZSTD[T], error) { }, nil } -func (c *ZSTD[T]) Encode(m T) ([]byte, error) { - cborEncoded, err := c.cborEncoding.Encode(m) - if len(cborEncoded) > maxDecompressedSize { +func (c *ZSTD[T]) Encode(t T) (_ []byte, _err error) { + decompressed, err := c.cborEncoding.Encode(t) + if len(decompressed) > maxDecompressedSize { // Error out early if the encoded value is too large to be decompressed. - return nil, fmt.Errorf("encoded value cannot exceed maximum size: %d > %d", len(cborEncoded), maxDecompressedSize) + return nil, fmt.Errorf("encoded value cannot exceed maximum size: %d > %d", len(decompressed), maxDecompressedSize) } if err != nil { return nil, err } - compressed := c.compressor.EncodeAll(cborEncoded, make([]byte, 0, len(cborEncoded))) + + compressed := c.compress(decompressed) + c.meterCompactionRatio(t, len(decompressed), len(compressed)) return compressed, nil } -func (c *ZSTD[T]) Decode(v []byte, t T) error { +func (c *ZSTD[T]) Decode(compressed []byte, t T) error { buf := bufferPool.Get().(*[]byte) defer bufferPool.Put(buf) - cborEncoded, err := c.decompressor.DecodeAll(v, (*buf)[:0]) + decompressed, err := c.decompressInto(compressed, (*buf)[:0]) if err != nil { return err } - return c.cborEncoding.Decode(cborEncoded, t) + c.meterCompactionRatio(t, len(decompressed), len(compressed)) + return c.cborEncoding.Decode(decompressed, t) +} + +func (c *ZSTD[T]) compress(decompressed []byte) []byte { + defer func(start time.Time) { + metrics.encodingTime.Record(context.Background(), + time.Since(start).Seconds(), + metric.WithAttributeSet(attrSetZstdEncode)) + }(time.Now()) + return c.compressor.EncodeAll(decompressed, make([]byte, 0, len(decompressed))) +} + +func (c *ZSTD[T]) decompressInto(compressed []byte, buf []byte) (_ []byte, _err error) { + defer func(start time.Time) { + if _err != nil { + metrics.encodingTime.Record(context.Background(), + time.Since(start).Seconds(), + metric.WithAttributeSet(attrSetZstdDecode)) + } + }(time.Now()) + return c.decompressor.DecodeAll(compressed, buf) +} + +func (c *ZSTD[T]) meterCompactionRatio(target T, decompressedSize, compressedSize int) { + compactionRatio := float64(decompressedSize) / float64(compressedSize) + metrics.zstdCompactionRatio.Record(context.Background(), compactionRatio, metric.WithAttributes(c.getMetricAttribute(target))) +} + +func (c *ZSTD[T]) getMetricAttribute(t T) attribute.KeyValue { + c.metricAttrLoader.Do(func() { + const key = "type" + switch target := reflect.TypeOf(t); { + case target.Kind() == reflect.Ptr: + c.metricAttr = attribute.String(key, target.Elem().Name()) + default: + c.metricAttr = attribute.String(key, target.Name()) + } + }) + return c.metricAttr } diff --git a/internal/encoding/encoding_api_test.go b/internal/encoding/encoding_api_test.go new file mode 100644 index 00000000..18241042 --- /dev/null +++ b/internal/encoding/encoding_api_test.go @@ -0,0 +1,7 @@ +package encoding + +import "go.opentelemetry.io/otel/attribute" + +// GetMetricAttribute returns the attribute for metric collection, exported for +// testing purposes. +func (c *ZSTD[T]) GetMetricAttribute(t T) attribute.KeyValue { return c.getMetricAttribute(t) } diff --git a/internal/encoding/encoding_test.go b/internal/encoding/encoding_test.go index 1d186e9f..2d8a1c94 100644 --- a/internal/encoding/encoding_test.go +++ b/internal/encoding/encoding_test.go @@ -9,6 +9,7 @@ import ( "github.com/klauspost/compress/zstd" "github.com/stretchr/testify/require" cbg "github.com/whyrusleeping/cbor-gen" + "go.opentelemetry.io/otel/attribute" ) var ( @@ -77,3 +78,20 @@ func TestZSTDLimits(t *testing.T) { var dest testValue require.ErrorContains(t, subject.Decode(tooLargeACompression, &dest), "decompressed size exceeds configured limit") } + +func TestZSTD_GetMetricAttribute(t *testing.T) { + t.Run("By Pointer", func(t *testing.T) { + subject, err := encoding.NewZSTD[*testValue]() + require.NoError(t, err) + require.Equal(t, attribute.String("type", "testValue"), subject.GetMetricAttribute(&testValue{})) + }) + t.Run("By Value", func(t *testing.T) { + type anotherTestValue struct { + cbg.CBORUnmarshaler + cbg.CBORMarshaler + } + subject, err := encoding.NewZSTD[anotherTestValue]() + require.NoError(t, err) + require.Equal(t, attribute.String("type", "anotherTestValue"), subject.GetMetricAttribute(anotherTestValue{})) + }) +} diff --git a/internal/encoding/metrics.go b/internal/encoding/metrics.go new file mode 100644 index 00000000..09bf83b8 --- /dev/null +++ b/internal/encoding/metrics.go @@ -0,0 +1,38 @@ +package encoding + +import ( + "github.com/filecoin-project/go-f3/internal/measurements" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +var ( + attrCodecCbor = attribute.String("codec", "cbor") + attrCodecZstd = attribute.String("codec", "zstd") + attrActionEncode = attribute.String("action", "encode") + attrActionDecode = attribute.String("action", "decode") + attrSetCborEncode = attribute.NewSet(attrCodecCbor, attrActionEncode) + attrSetCborDecode = attribute.NewSet(attrCodecCbor, attrActionDecode) + attrSetZstdEncode = attribute.NewSet(attrCodecZstd, attrActionEncode) + attrSetZstdDecode = attribute.NewSet(attrCodecZstd, attrActionDecode) + + meter = otel.Meter("f3/internal/encoding") + + metrics = struct { + encodingTime metric.Float64Histogram + zstdCompactionRatio metric.Float64Histogram + }{ + encodingTime: measurements.Must(meter.Float64Histogram( + "f3_internal_encoding_time", + metric.WithDescription("The time spent on encoding/decoding in seconds."), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(0.001, 0.003, 0.005, 0.01, 0.03, 0.05, 0.1, 0.3, 0.5, 1.0, 2.0, 5.0, 10.0), + )), + zstdCompactionRatio: measurements.Must(meter.Float64Histogram( + "f3_internal_encoding_zstd_compaction_ratio", + metric.WithDescription("The ratio of compressed to uncompressed data size for zstd encoding."), + metric.WithExplicitBucketBoundaries(0.0, 0.1, 0.2, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0), + )), + } +)